ACCUMULO-3147 Create replication table at initialize, upgrade

  Migrate to all replication code to a built-in system-managed replication
  table instead of the user-space one. Ensure all replication-related tests
  pass. Also fix a small bug in Mock which threw an exception when using
  deleteRows with null start and end rows.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e840549a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e840549a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e840549a

Branch: refs/heads/master
Commit: e840549a9c6376c8c5dbe9663e14b4987c10dc5e
Parents: f1a47b3
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Sat Nov 1 18:53:14 2014 -0400
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Thu Nov 6 18:39:55 2014 -0500

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  |   5 +-
 .../accumulo/core/client/mock/MockAccumulo.java |   2 +
 .../core/client/mock/MockTableOperations.java   |   8 +-
 .../replication/PrintReplicationRecords.java    |   2 +-
 .../core/replication/ReplicationTable.java      |  51 ++++-
 .../ReplicationTableOfflineException.java       |  32 +++
 .../ReplicationOperationsImplTest.java          |  22 +--
 .../apache/accumulo/server/ServerConstants.java |   8 +-
 .../server/constraints/MetadataConstraints.java | 119 ++++++-----
 .../apache/accumulo/server/init/Initialize.java | 174 ++++++++++------
 .../server/replication/ReplicationUtil.java     | 149 +-------------
 .../server/security/SecurityOperation.java      |   3 +-
 .../accumulo/server/util/MetadataTableUtil.java |  24 ++-
 .../replication/ReplicationTableTest.java       | 133 -------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  40 ++--
 .../accumulo/gc/SimpleGarbageCollector.java     |   3 +-
 .../CloseWriteAheadLogReferences.java           |  14 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |   9 +-
 .../CloseWriteAheadLogReferencesTest.java       |  14 +-
 .../accumulo/master/FateServiceHandler.java     |   3 +-
 .../java/org/apache/accumulo/master/Master.java |  21 +-
 .../master/metrics/ReplicationMetrics.java      |   5 +-
 .../DistributedWorkQueueWorkAssigner.java       |   8 +-
 .../master/replication/FinishedWorkUpdater.java |  10 +-
 .../RemoveCompleteReplicationRecords.java       |   8 +-
 .../master/replication/StatusMaker.java         |  16 +-
 .../accumulo/master/replication/WorkMaker.java  |  10 +-
 .../accumulo/master/util/TableValidators.java   |  26 ++-
 .../replication/FinishedWorkUpdaterTest.java    |   7 -
 .../RemoveCompleteReplicationRecordsTest.java   |   8 +-
 .../replication/SequentialWorkAssignerTest.java |  15 +-
 .../replication/UnorderedWorkAssignerTest.java  |  10 +-
 .../master/replication/WorkMakerTest.java       |  15 +-
 .../monitor/servlets/ReplicationServlet.java    |   5 -
 .../replication/ReplicationProcessor.java       |   6 +-
 .../apache/accumulo/test/LargeSplitRowIT.java   |   5 +-
 .../test/MasterRepairsDualAssignmentIT.java     |   8 +-
 .../accumulo/test/MetaConstraintRetryIT.java    |   7 +-
 .../org/apache/accumulo/test/NamespacesIT.java  |   6 +-
 .../org/apache/accumulo/test/ShellServerIT.java |  17 +-
 .../apache/accumulo/test/WaitForBalanceIT.java  |  14 +-
 .../replication/MultiInstanceReplicationIT.java |  19 +-
 .../test/replication/ReplicationIT.java         | 198 +++++++++++++------
 .../test/replication/StatusCombinerMacTest.java |  19 +-
 .../UnorderedWorkAssignerReplicationIT.java     |  11 +-
 45 files changed, 612 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index a8f3f21..cd57c2f 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * 
+ *
  */
 public class ReplicationOperationsImpl implements ReplicationOperations {
   private static final Logger log = 
LoggerFactory.getLogger(ReplicationOperationsImpl.class);
@@ -216,9 +216,6 @@ public class ReplicationOperationsImpl implements 
ReplicationOperations {
 
   protected Text getTableId(Connector conn, String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
     TableOperations tops = conn.tableOperations();
-    while (!tops.exists(ReplicationTable.NAME)) {
-      UtilWaitThread.sleep(200);
-    }
 
     if (!conn.tableOperations().exists(tableName)) {
       throw new TableNotFoundException(null, tableName, null);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java 
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
index 32dbb28..66035f7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
@@ -31,6 +31,7 @@ import 
org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.NamespacePermission;
 import org.apache.accumulo.core.security.SystemPermission;
@@ -58,6 +59,7 @@ public class MockAccumulo {
     namespaces.put(Namespaces.ACCUMULO_NAMESPACE, new MockNamespace());
     createTable("root", RootTable.NAME, true, TimeType.LOGICAL);
     createTable("root", MetadataTable.NAME, true, TimeType.LOGICAL);
+    createTable("root", ReplicationTable.NAME, true, TimeType.LOGICAL);
   }
 
   public FileSystem getFileSystem() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
 
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index 08750fe..59afc8b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -38,8 +38,8 @@ import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
-import org.apache.accumulo.core.client.impl.TableOperationsHelper;
 import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.client.impl.TableOperationsHelper;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
@@ -376,6 +376,10 @@ class MockTableOperations extends TableOperationsHelper {
       throw new TableNotFoundException(tableName, tableName, "");
     MockTable t = acu.tables.get(tableName);
     Text startText = start != null ? new Text(start) : new Text();
+    if (startText.getLength() == 0 && end == null) {
+      t.table.clear();
+      return;
+    }
     Text endText = end != null ? new Text(end) : new 
Text(t.table.lastKey().getRow().getBytes());
     startText.append(ZERO, 0, 1);
     endText.append(ZERO, 0, 1);
@@ -442,7 +446,7 @@ class MockTableOperations extends TableOperationsHelper {
     try {
       AccumuloVFSClassLoader.loadClass(className, Class.forName(asTypeName));
     } catch (ClassNotFoundException e) {
-      log.warn("Could not load class '"+className+"' with type name 
'"+asTypeName+"' in testClassLoad().", e);
+      log.warn("Could not load class '" + className + "' with type name '" + 
asTypeName + "' in testClassLoad().", e);
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index ee606b5..35b6df6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * 
+ *
  */
 public class PrintReplicationRecords implements Runnable {
   private static final Logger log = 
LoggerFactory.getLogger(PrintReplicationRecords.class);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
index 2736762..b6d343f 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -20,12 +20,18 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.security.Authorizations;
@@ -35,7 +41,8 @@ import com.google.common.collect.ImmutableMap;
 
 public class ReplicationTable {
 
-  public static final String NAME = "replication";
+  public static final String ID = "+rep";
+  public static final String NAME = Namespaces.ACCUMULO_NAMESPACE + 
".replication";
 
   public static final String COMBINER_NAME = "statuscombiner";
 
@@ -46,16 +53,46 @@ public class ReplicationTable {
   public static final Map<String,Set<Text>> LOCALITY_GROUPS = 
ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, 
WORK_LG_COLFAMS);
   public static final String STATUS_FORMATTER_CLASS_NAME = 
StatusFormatter.class.getName();
 
-  public static Scanner getScanner(Connector conn) throws 
TableNotFoundException {
-    return conn.createScanner(NAME, Authorizations.EMPTY);
+  public static Scanner getScanner(Connector conn) throws 
ReplicationTableOfflineException {
+    try {
+      return conn.createScanner(NAME, Authorizations.EMPTY);
+    } catch (TableNotFoundException e) {
+      throw new AssertionError(NAME + " should exist, but doesn't.");
+    } catch (TableOfflineException e) {
+      throw new ReplicationTableOfflineException(e);
+    }
   }
 
-  public static BatchWriter getBatchWriter(Connector conn) throws 
TableNotFoundException {
-    return conn.createBatchWriter(NAME, new BatchWriterConfig());
+  public static BatchWriter getBatchWriter(Connector conn) throws 
ReplicationTableOfflineException {
+    try {
+      return conn.createBatchWriter(NAME, new BatchWriterConfig());
+    } catch (TableNotFoundException e) {
+      throw new AssertionError(NAME + " should exist, but doesn't.");
+    } catch (TableOfflineException e) {
+      throw new ReplicationTableOfflineException(e);
+    }
   }
 
-  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) 
throws TableNotFoundException {
-    return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads);
+  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) 
throws ReplicationTableOfflineException {
+    try {
+      return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads);
+    } catch (TableNotFoundException e) {
+      throw new AssertionError(NAME + " should exist, but doesn't.");
+    } catch (TableOfflineException e) {
+      throw new ReplicationTableOfflineException(e);
+    }
+  }
+
+  public static boolean isOnline(Connector conn) {
+    return TableState.ONLINE == Tables.getTableState(conn.getInstance(), ID);
+  }
+
+  public static void setOnline(Connector conn) throws 
AccumuloSecurityException, AccumuloException {
+    try {
+      conn.tableOperations().online(NAME, true);
+    } catch (TableNotFoundException e) {
+      throw new AssertionError(NAME + " should exist, but doesn't.");
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java
new file mode 100644
index 0000000..3cc56e0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+import org.apache.accumulo.core.client.TableOfflineException;
+
+/**
+ *
+ */
+public class ReplicationTableOfflineException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public ReplicationTableOfflineException(TableOfflineException e) {
+    super(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
index 54a9f8c..b404d3d 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
@@ -49,7 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class ReplicationOperationsImplTest {
   private static final Logger log = 
LoggerFactory.getLogger(ReplicationOperationsImplTest.class);
@@ -67,14 +67,13 @@ public class ReplicationOperationsImplTest {
   @Test
   public void waitsUntilEntriesAreReplicated() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
     Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
 
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = 
"/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
@@ -136,7 +135,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    bw = ReplicationTable.getBatchWriter(conn);
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId);
     bw.addMutation(m);
@@ -163,7 +162,6 @@ public class ReplicationOperationsImplTest {
   @Test
   public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
     conn.tableOperations().create("bar");
 
@@ -173,7 +171,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = 
"/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -227,7 +225,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    bw = ReplicationTable.getBatchWriter(conn);
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId1);
     bw.addMutation(m);
@@ -247,7 +245,6 @@ public class ReplicationOperationsImplTest {
   @Test
   public void inprogressReplicationRecordsBlockExecution() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
 
     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -255,7 +252,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -313,7 +310,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    bw = ReplicationTable.getBatchWriter(conn);
     m = new Mutation(file1);
     m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
     bw.addMutation(m);
@@ -333,7 +330,6 @@ public class ReplicationOperationsImplTest {
   @Test
   public void laterCreatedLogsDontBlockExecution() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
 
     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -341,7 +337,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
     bw.addMutation(m);
@@ -395,7 +391,7 @@ public class ReplicationOperationsImplTest {
       System.out.println(e.getKey());
     }
 
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    bw = ReplicationTable.getBatchWriter(conn);
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId1);
     bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java 
b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
index 6c331e0..9cc9bcb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -47,9 +47,13 @@ public class ServerConstants {
   public static final Integer WIRE_VERSION = 3;
 
   /**
-   * version (7) reflects the change in the representation of trace 
information in TraceRepo
+   * version (7) also reflects the addition of a replication table
    */
-  public static final int DATA_VERSION = 7;
+  public static final int MOVE_TO_REPLICATION_TABLE = 7;
+  /**
+   * this is the current data version
+   */
+  public static final int DATA_VERSION = MOVE_TO_REPLICATION_TABLE;
   /**
    * version (6) reflects the addition of a separate root table 
(ACCUMULO-1481) in version 1.6.0
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index e4f61f9..487829e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -49,48 +49,47 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 public class MetadataConstraints implements Constraint {
-  
+
   private ZooCache zooCache = null;
   private String zooRoot = null;
-  
+
   private static final Logger log = 
Logger.getLogger(MetadataConstraints.class);
-  
+
   private static boolean[] validTableNameChars = new boolean[256];
-  
+
   {
     for (int i = 0; i < 256; i++) {
-      validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= 
'9')) || i == '!';
+      validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= 
'9')) || i == '!' || i == '+';
     }
   }
-  
-  private static final HashSet<ColumnFQ> validColumnQuals = new 
HashSet<ColumnFQ>(Arrays.asList(
-      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, 
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
-      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, 
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, 
TabletsSection.ServerColumnFamily.TIME_COLUMN,
-      TabletsSection.ServerColumnFamily.LOCK_COLUMN, 
TabletsSection.ServerColumnFamily.FLUSH_COLUMN, 
TabletsSection.ServerColumnFamily.COMPACT_COLUMN));
-  
-  private static final HashSet<Text> validColumnFams = new 
HashSet<Text>(Arrays.asList(TabletsSection.BulkFileColumnFamily.NAME,
-      LogColumnFamily.NAME, ScanFileColumnFamily.NAME, 
DataFileColumnFamily.NAME,
-      TabletsSection.CurrentLocationColumnFamily.NAME, 
TabletsSection.LastLocationColumnFamily.NAME, 
TabletsSection.FutureLocationColumnFamily.NAME,
-      ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME));
-  
+
+  private static final HashSet<ColumnFQ> validColumnQuals = new 
HashSet<ColumnFQ>(Arrays.asList(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN,
+      TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, 
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN,
+      TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, 
TabletsSection.ServerColumnFamily.TIME_COLUMN, 
TabletsSection.ServerColumnFamily.LOCK_COLUMN,
+      TabletsSection.ServerColumnFamily.FLUSH_COLUMN, 
TabletsSection.ServerColumnFamily.COMPACT_COLUMN));
+
+  private static final HashSet<Text> validColumnFams = new 
HashSet<Text>(Arrays.asList(TabletsSection.BulkFileColumnFamily.NAME, 
LogColumnFamily.NAME,
+      ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME, 
TabletsSection.CurrentLocationColumnFamily.NAME, 
TabletsSection.LastLocationColumnFamily.NAME,
+      TabletsSection.FutureLocationColumnFamily.NAME, 
ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME));
+
   private static boolean isValidColumn(ColumnUpdate cu) {
-    
+
     if (validColumnFams.contains(new Text(cu.getColumnFamily())))
       return true;
-    
+
     if (validColumnQuals.contains(new ColumnFQ(cu)))
       return true;
-    
+
     return false;
   }
-  
+
   static private ArrayList<Short> addViolation(ArrayList<Short> lst, int 
violation) {
     if (lst == null)
       lst = new ArrayList<Short>();
     lst.add((short) violation);
     return lst;
   }
-  
+
   static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int 
intViolation) {
     if (lst == null)
       return addViolation(lst, intViolation);
@@ -99,38 +98,38 @@ public class MetadataConstraints implements Constraint {
       return addViolation(lst, intViolation);
     return lst;
   }
-  
+
   @Override
   public List<Short> check(Environment env, Mutation mutation) {
-    
+
     ArrayList<Short> violations = null;
-    
+
     Collection<ColumnUpdate> colUpdates = mutation.getUpdates();
-    
+
     // check the row, it should contains at least one ; or end with <
     boolean containsSemiC = false;
-    
+
     byte[] row = mutation.getRow();
-    
+
     // always allow rows that fall within reserved areas
     if (row.length > 0 && row[0] == '~')
       return null;
     if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~')
       return null;
-    
+
     for (byte b : row) {
       if (b == ';') {
         containsSemiC = true;
       }
-      
+
       if (b == ';' || b == '<')
         break;
-      
+
       if (!validTableNameChars[0xff & b]) {
         violations = addIfNotPresent(violations, 4);
       }
     }
-    
+
     if (!containsSemiC) {
       // see if last row char is <
       if (row.length == 0 || row[row.length - 1] != '<') {
@@ -141,38 +140,38 @@ public class MetadataConstraints implements Constraint {
         violations = addIfNotPresent(violations, 4);
       }
     }
-    
+
     if (row.length > 0 && row[0] == '!') {
       if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) 
{
         violations = addIfNotPresent(violations, 4);
       }
     }
-    
+
     // ensure row is not less than Constants.METADATA_TABLE_ID
     if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) {
       violations = addViolation(violations, 5);
     }
-    
+
     boolean checkedBulk = false;
-    
+
     for (ColumnUpdate columnUpdate : colUpdates) {
       Text columnFamily = new Text(columnUpdate.getColumnFamily());
-      
+
       if (columnUpdate.isDeleted()) {
         if (!isValidColumn(columnUpdate)) {
           violations = addViolation(violations, 2);
         }
         continue;
       }
-      
+
       if (columnUpdate.getValue().length == 0 && 
!columnFamily.equals(ScanFileColumnFamily.NAME)) {
         violations = addViolation(violations, 6);
       }
-      
+
       if (columnFamily.equals(DataFileColumnFamily.NAME)) {
         try {
           DataFileValue dfv = new DataFileValue(columnUpdate.getValue());
-          
+
           if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) {
             violations = addViolation(violations, 1);
           }
@@ -182,7 +181,7 @@ public class MetadataConstraints implements Constraint {
           violations = addViolation(violations, 1);
         }
       } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
-        
+
       } else if 
(columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) {
         if (!columnUpdate.isDeleted() && !checkedBulk) {
           // splits, which also write the time reference, are allowed to write 
this reference even when
@@ -193,13 +192,13 @@ public class MetadataConstraints implements Constraint {
           // but it writes everything. We allow it to re-write the bulk 
information if it is setting the location.
           // See ACCUMULO-1230.
           boolean isLocationMutation = false;
-          
+
           HashSet<Text> dataFiles = new HashSet<Text>();
           HashSet<Text> loadedFiles = new HashSet<Text>();
 
           String tidString = new String(columnUpdate.getValue(), UTF_8);
           int otherTidCount = 0;
-          
+
           for (ColumnUpdate update : mutation.getUpdates()) {
             if (new 
ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) {
               isSplitMutation = true;
@@ -209,16 +208,16 @@ public class MetadataConstraints implements Constraint {
               dataFiles.add(new Text(update.getColumnQualifier()));
             } else if (new 
Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME))
 {
               loadedFiles.add(new Text(update.getColumnQualifier()));
-              
+
               if (!new String(update.getValue(), UTF_8).equals(tidString)) {
                 otherTidCount++;
               }
             }
           }
-          
+
           if (!isSplitMutation && !isLocationMutation) {
             long tid = Long.parseLong(tidString);
-            
+
             try {
               if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || 
!getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
                 violations = addViolation(violations, 8);
@@ -227,7 +226,7 @@ public class MetadataConstraints implements Constraint {
               violations = addViolation(violations, 8);
             }
           }
-          
+
           checkedBulk = true;
         }
       } else {
@@ -236,11 +235,11 @@ public class MetadataConstraints implements Constraint {
         } else if (new 
ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN)
 && columnUpdate.getValue().length > 0
             && (violations == null || !violations.contains((short) 4))) {
           KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) 
null);
-          
+
           Text per = KeyExtent.decodePrevEndRow(new 
Value(columnUpdate.getValue()));
-          
+
           boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == 
null || per.compareTo(ke.getEndRow()) < 0;
-          
+
           if (!prevEndRowLessThanEndRow) {
             violations = addViolation(violations, 3);
           }
@@ -248,28 +247,28 @@ public class MetadataConstraints implements Constraint {
           if (zooCache == null) {
             zooCache = new ZooCache();
           }
-          
+
           if (zooRoot == null) {
             zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance());
           }
-          
+
           boolean lockHeld = false;
           String lockId = new String(columnUpdate.getValue(), UTF_8);
-          
+
           try {
             lockHeld = ZooLock.isLockHeld(zooCache, new 
ZooUtil.LockID(zooRoot, lockId));
           } catch (Exception e) {
             log.debug("Failed to verify lock was held " + lockId + " " + 
e.getMessage());
           }
-          
+
           if (!lockHeld) {
             violations = addViolation(violations, 7);
           }
         }
-        
+
       }
     }
-    
+
     if (violations != null) {
       log.debug("violating metadata mutation : " + new 
String(mutation.getRow(), UTF_8));
       for (ColumnUpdate update : mutation.getUpdates()) {
@@ -277,14 +276,14 @@ public class MetadataConstraints implements Constraint {
             + (update.isDeleted() ? "[delete]" : new String(update.getValue(), 
UTF_8)));
       }
     }
-    
+
     return violations;
   }
-  
+
   protected Arbitrator getArbitrator() {
     return new ZooArbitrator();
   }
-  
+
   @Override
   public String getViolationDescription(short violationCode) {
     switch (violationCode) {
@@ -307,11 +306,11 @@ public class MetadataConstraints implements Constraint {
     }
     return null;
   }
-  
+
   @Override
   protected void finalize() {
     if (zooCache != null)
       zooCache.clear();
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java 
b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 45e883d..24ff637 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -25,10 +25,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 
 import jline.console.ConsoleReader;
@@ -57,10 +60,20 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -91,6 +104,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 
 import com.beust.jcommander.Parameter;
+import com.google.common.base.Joiner;
 
 /**
  * This class is used to setup the directory structure and the root tablet to 
get an instance started
@@ -99,7 +113,7 @@ import com.beust.jcommander.Parameter;
 public class Initialize {
   private static final Logger log = Logger.getLogger(Initialize.class);
   private static final String DEFAULT_ROOT_USER = "root";
-  public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
+  private static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
 
   private static ConsoleReader reader = null;
   private static IZooReaderWriter zoo = ZooReaderWriter.getInstance();
@@ -130,6 +144,8 @@ public class Initialize {
   }
 
   private static HashMap<String,String> initialMetadataConf = new 
HashMap<String,String>();
+  private static HashMap<String,String> initialMetadataCombinerConf = new 
HashMap<String,String>();
+  private static HashMap<String,String> initialReplicationTableConf = new 
HashMap<String,String>();
   static {
     
initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), 
"32K");
     initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
@@ -146,13 +162,44 @@ public class Initialize {
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + 
"majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
     initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
     initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"tablet",
-        String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, 
TabletsSection.CurrentLocationColumnFamily.NAME));
-    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"server", String.format("%s,%s,%s,%s", TabletsSection.DataFileColumnFamily.NAME,
-        TabletsSection.LogColumnFamily.NAME, 
TabletsSection.ServerColumnFamily.NAME, 
TabletsSection.FutureLocationColumnFamily.NAME));
+        String.format("%s,%s", TabletColumnFamily.NAME, 
CurrentLocationColumnFamily.NAME));
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + 
"server",
+        String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME, 
LogColumnFamily.NAME, ServerColumnFamily.NAME, 
FutureLocationColumnFamily.NAME));
     initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), 
"tablet,server");
     
initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), 
"");
     initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), 
"true");
     initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), 
"true");
+
+    // ACCUMULO-3077 Set the combiner on accumulo.metadata during init to 
reduce the likelihood of a race
+    // condition where a tserver compacts away Status updates because it 
didn't see the Combiner configured
+    IteratorSetting setting = new IteratorSetting(9, 
ReplicationTableUtil.COMBINER_NAME, StatusCombiner.class);
+    Combiner.setColumns(setting, Collections.singletonList(new 
Column(ReplicationSection.COLF)));
+    for (IteratorScope scope : IteratorScope.values()) {
+      String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, 
scope.name().toLowerCase(), setting.getName());
+      for (Entry<String,String> prop : setting.getOptions().entrySet()) {
+        initialMetadataCombinerConf.put(root + ".opt." + prop.getKey(), 
prop.getValue());
+      }
+      initialMetadataCombinerConf.put(root, setting.getPriority() + "," + 
setting.getIteratorClass());
+    }
+
+    // add combiners to replication table
+    setting = new IteratorSetting(30, ReplicationTable.COMBINER_NAME, 
StatusCombiner.class);
+    setting.setPriority(30);
+    Combiner.setColumns(setting, Arrays.asList(new Column(StatusSection.NAME), 
new Column(WorkSection.NAME)));
+    for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) {
+      String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, 
scope.name().toLowerCase(), setting.getName());
+      for (Entry<String,String> prop : setting.getOptions().entrySet()) {
+        initialReplicationTableConf.put(root + ".opt." + prop.getKey(), 
prop.getValue());
+      }
+      initialReplicationTableConf.put(root, setting.getPriority() + "," + 
setting.getIteratorClass());
+    }
+    // add locality groups to replication table
+    for (Entry<String,Set<Text>> g : 
ReplicationTable.LOCALITY_GROUPS.entrySet()) {
+      initialReplicationTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX + 
g.getKey(), LocalityGroupUtil.encodeColumnFamilies(g.getValue()));
+    }
+    initialReplicationTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), 
Joiner.on(",").join(ReplicationTable.LOCALITY_GROUPS.keySet()));
+    // add formatter to replication table
+    initialReplicationTableConf.put(Property.TABLE_FORMATTER_CLASS.getKey(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
   }
 
   static boolean checkInit(Configuration conf, VolumeManager fs, 
SiteConfiguration sconf) throws IOException {
@@ -305,60 +352,79 @@ public class Initialize {
   private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, 
String rootTabletDir) throws IOException {
     initDirs(fs, uuid, 
VolumeConfiguration.getVolumeUris(SiteConfiguration.getInstance()), false);
 
-    // initialize initial metadata config in zookeeper
-    initMetadataConfig();
+    // initialize initial system tables config in zookeeper
+    initSystemTablesConfig();
 
     String tableMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + 
Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID
         + TABLE_TABLETS_TABLET_DIR;
+    String replicationTableDefaultTabletDir = 
fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR + ReplicationTable.ID
+        + Constants.DEFAULT_TABLET_LOCATION;
+
     String defaultMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) 
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID
         + Constants.DEFAULT_TABLET_LOCATION;
 
     // create table and default tablets directories
-    createDirectories(fs, rootTabletDir, tableMetadataTabletDir, 
defaultMetadataTabletDir);
+    createDirectories(fs, rootTabletDir, tableMetadataTabletDir, 
defaultMetadataTabletDir, replicationTableDefaultTabletDir);
+
+    String ext = 
FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
 
-    // populate the root tablet with info about the metadata tablets
-    String fileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + 
FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
-    createMetadataFile(fs, fileName, MetadataTable.ID, tableMetadataTabletDir, 
defaultMetadataTabletDir);
+    // populate the metadata tables tablet with info about the replication 
table's one initial tablet
+    String metadataFileName = tableMetadataTabletDir + Path.SEPARATOR + "0_1." 
+ ext;
+    Tablet replicationTablet = new Tablet(ReplicationTable.ID, 
replicationTableDefaultTabletDir, null, null);
+    createMetadataFile(fs, metadataFileName, replicationTablet);
+
+    // populate the root tablet with info about the metadata table's two 
initial tablets
+    String rootTabletFileName = rootTabletDir + Path.SEPARATOR + 
"00000_00000." + ext;
+    Text splitPoint = TabletsSection.getRange().getEndKey().getRow();
+    Tablet tablesTablet = new Tablet(MetadataTable.ID, tableMetadataTabletDir, 
null, splitPoint, metadataFileName);
+    Tablet defaultTablet = new Tablet(MetadataTable.ID, 
defaultMetadataTabletDir, splitPoint, null);
+    createMetadataFile(fs, rootTabletFileName, tablesTablet, defaultTablet);
   }
 
-  /**
-   * Create an rfile in the default tablet's directory for a new table. This 
method is used to create the initial root tablet contents, with information 
about
-   * the metadata table's tablets
-   *
-   * @param volmanager
-   *          The VolumeManager
-   * @param fileName
-   *          The location to create the file
-   * @param tableId
-   *          TableID that is being "created"
-   * @param tableTabletDir
-   *          The table_info directory for the new table
-   * @param defaultTabletDir
-   *          The default_tablet directory for the new table
-   */
-  private static void createMetadataFile(VolumeManager volmanager, String 
fileName, String tableId, String tableTabletDir, String defaultTabletDir)
-      throws IOException {
+  private static class Tablet {
+    String tableId, dir;
+    Text prevEndRow, endRow;
+    String[] files;
+
+    Tablet(String tableId, String dir, Text prevEndRow, Text endRow, String... 
files) {
+      this.tableId = tableId;
+      this.dir = dir;
+      this.prevEndRow = prevEndRow;
+      this.endRow = endRow;
+      this.files = files;
+    }
+  }
+
+  private static void createMetadataFile(VolumeManager volmanager, String 
fileName, Tablet... tablets) throws IOException {
+    // sort file contents in memory, then play back to the file
+    TreeMap<Key,Value> sorted = new TreeMap<>();
+    for (Tablet tablet : tablets) {
+      createEntriesForTablet(sorted, tablet);
+    }
     FileSystem fs = volmanager.getVolumeByPath(new 
Path(fileName)).getFileSystem();
     FileSKVWriter tabletWriter = 
FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), 
AccumuloConfiguration.getDefaultConfiguration());
     tabletWriter.startDefaultLocalityGroup();
 
-    Text splitPoint = TabletsSection.getRange().getEndKey().getRow();
-    createEntriesForTablet(tabletWriter, tableId, tableTabletDir, null, 
splitPoint);
-    createEntriesForTablet(tabletWriter, tableId, defaultTabletDir, 
splitPoint, null);
+    for (Entry<Key,Value> entry : sorted.entrySet()) {
+      tabletWriter.append(entry.getKey(), entry.getValue());
+    }
 
     tabletWriter.close();
   }
 
-  private static void createEntriesForTablet(FileSKVWriter writer, String 
tableId, String tabletDir, Text tabletPrevEndRow, Text tabletEndRow)
-      throws IOException {
-    Text extent = new Text(KeyExtent.getMetadataEntry(new Text(tableId), 
tabletEndRow));
-    addEntry(writer, extent, DIRECTORY_COLUMN, new 
Value(tabletDir.getBytes(UTF_8)));
-    addEntry(writer, extent, TIME_COLUMN, new 
Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8)));
-    addEntry(writer, extent, PREV_ROW_COLUMN, 
KeyExtent.encodePrevEndRow(tabletPrevEndRow));
+  private static void createEntriesForTablet(TreeMap<Key,Value> map, Tablet 
tablet) {
+    Value EMPTY_SIZE = new Value("0,0".getBytes(UTF_8));
+    Text extent = new Text(KeyExtent.getMetadataEntry(new 
Text(tablet.tableId), tablet.endRow));
+    addEntry(map, extent, DIRECTORY_COLUMN, new 
Value(tablet.dir.getBytes(UTF_8)));
+    addEntry(map, extent, TIME_COLUMN, new Value((TabletTime.LOGICAL_TIME_ID + 
"0").getBytes(UTF_8)));
+    addEntry(map, extent, PREV_ROW_COLUMN, 
KeyExtent.encodePrevEndRow(tablet.prevEndRow));
+    for (String file : tablet.files) {
+      addEntry(map, extent, new ColumnFQ(DataFileColumnFamily.NAME, new 
Text(file)), EMPTY_SIZE);
+    }
   }
 
-  private static void addEntry(FileSKVWriter writer, Text row, ColumnFQ col, 
Value value) throws IOException {
-    writer.append(new Key(row, col.getColumnFamily(), 
col.getColumnQualifier(), 0), value);
+  private static void addEntry(TreeMap<Key,Value> map, Text row, ColumnFQ col, 
Value value) {
+    map.put(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), 
value);
   }
 
   private static void createDirectories(VolumeManager fs, String... dirs) 
throws IOException {
@@ -404,6 +470,8 @@ public class Initialize {
     TableManager.prepareNewNamespaceState(uuid, 
Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE, 
NodeExistsPolicy.FAIL);
     TableManager.prepareNewTableState(uuid, RootTable.ID, 
Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, 
NodeExistsPolicy.FAIL);
     TableManager.prepareNewTableState(uuid, MetadataTable.ID, 
Namespaces.ACCUMULO_NAMESPACE_ID, MetadataTable.NAME, TableState.ONLINE, 
NodeExistsPolicy.FAIL);
+    TableManager.prepareNewTableState(uuid, ReplicationTable.ID, 
Namespaces.ACCUMULO_NAMESPACE_ID, ReplicationTable.NAME, TableState.OFFLINE,
+        NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
@@ -412,8 +480,7 @@ public class Initialize {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, 
MasterGoalState.NORMAL.toString().getBytes(UTF_8),
-        NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, 
MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, EMPTY_BYTE_ARRAY, 
NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, 
EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
@@ -484,7 +551,7 @@ public class Initialize {
         opts.rootpass);
   }
 
-  public static void initMetadataConfig() throws IOException {
+  public static void initSystemTablesConfig() throws IOException {
     try {
       Configuration conf = CachedConfiguration.getInstance();
       int max = conf.getInt("dfs.replication.max", 512);
@@ -500,21 +567,16 @@ public class Initialize {
         if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), 
entry.getValue()))
           throw new IOException("Cannot create per-table property " + 
entry.getKey());
       }
-    } catch (Exception e) {
-      log.fatal("error talking to zookeeper", e);
-      throw new IOException(e);
-    }
-    // ACCUMULO-3077 Set the combiner on accumulo.metadata during init to 
reduce the likelihood of a race
-    // condition where a tserver compacts away Status updates because it 
didn't see the Combiner configured
-    IteratorSetting setting = new IteratorSetting(9, 
ReplicationTableUtil.COMBINER_NAME, StatusCombiner.class);
-    Combiner.setColumns(setting, Collections.singletonList(new 
Column(ReplicationSection.COLF)));
-    try {
-      for (IteratorScope scope : IteratorScope.values()) {
-        String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, 
scope.name().toLowerCase(), setting.getName());
-        for (Entry<String,String> prop : setting.getOptions().entrySet()) {
-          TablePropUtil.setTableProperty(MetadataTable.ID, root + ".opt." + 
prop.getKey(), prop.getValue());
-        }
-        TablePropUtil.setTableProperty(MetadataTable.ID, root, 
setting.getPriority() + "," + setting.getIteratorClass());
+      // Only add combiner config to accumulo.metadata table (ACCUMULO-3077)
+      for (Entry<String,String> entry : 
initialMetadataCombinerConf.entrySet()) {
+        if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), 
entry.getValue()))
+          throw new IOException("Cannot create per-table property " + 
entry.getKey());
+      }
+
+      // add configuration to the replication table
+      for (Entry<String,String> entry : 
initialReplicationTableConf.entrySet()) {
+        if (!TablePropUtil.setTableProperty(ReplicationTable.ID, 
entry.getKey(), entry.getValue()))
+          throw new IOException("Cannot create per-table property " + 
entry.getKey());
       }
     } catch (Exception e) {
       log.fatal("Error talking to ZooKeeper", e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index e05a08c..50b15f5 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -18,9 +18,7 @@ package org.apache.accumulo.server.replication;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -32,10 +30,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
@@ -44,20 +39,16 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.Combiner;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -243,9 +234,9 @@ public class ReplicationUtil {
     if (null != path) {
       Scanner s;
       try {
-        s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
-      } catch (TableNotFoundException e) {
-        log.debug("Replication table no long exists", e);
+        s = ReplicationTable.getScanner(conn);
+      } catch (ReplicationTableOfflineException e) {
+        log.debug("Replication table no longer online", e);
         return status;
       }
 
@@ -294,136 +285,4 @@ public class ReplicationUtil {
     return newMap;
   }
 
-  public static synchronized void createReplicationTable(Connector conn) {
-    TableOperations tops = conn.tableOperations();
-    if (tops.exists(ReplicationTable.NAME)) {
-      if (configureReplicationTable(conn)) {
-        return;
-      }
-    }
-
-    for (int i = 0; i < 5; i++) {
-      try {
-        if (!tops.exists(ReplicationTable.NAME)) {
-          tops.create(ReplicationTable.NAME, false);
-        }
-        break;
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        log.error("Failed to create replication table", e);
-      } catch (TableExistsException e) {
-        // Shouldn't happen unless someone else made the table
-      }
-      log.error("Retrying table creation in 1 second...");
-      UtilWaitThread.sleep(1000);
-    }
-
-    for (int i = 0; i < 5; i++) {
-      if (configureReplicationTable(conn)) {
-        return;
-      }
-
-      log.error("Failed to configure the replication table, retying...");
-      UtilWaitThread.sleep(1000);
-    }
-
-    throw new RuntimeException("Could not configure replication table");
-  }
-
-  /**
-   * Attempts to configure the replication table, will return false if it fails
-   *
-   * @param conn
-   *          Connector for the instance
-   * @return True if the replication table is properly configured
-   */
-  protected static synchronized boolean configureReplicationTable(Connector 
conn) {
-    try {
-      conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.READ);
-    } catch (AccumuloException | AccumuloSecurityException e) {
-      log.warn("Could not grant root user read access to replication table", 
e);
-      // Should this be fatal? It's only for convenience, all r/w is done by 
!SYSTEM
-    }
-
-    TableOperations tops = conn.tableOperations();
-    Map<String,EnumSet<IteratorScope>> iterators = null;
-    try {
-      iterators = tops.listIterators(ReplicationTable.NAME);
-    } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
-      log.error("Could not fetch iterators for " + ReplicationTable.NAME, e);
-      return false;
-    }
-
-    if (!iterators.containsKey(ReplicationTable.COMBINER_NAME)) {
-      // Set our combiner and combine all columns
-      IteratorSetting setting = new IteratorSetting(30, 
ReplicationTable.COMBINER_NAME, StatusCombiner.class);
-      Combiner.setColumns(setting, Arrays.asList(new 
Column(StatusSection.NAME), new Column(WorkSection.NAME)));
-      try {
-        tops.attachIterator(ReplicationTable.NAME, setting);
-      } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
-        log.error("Could not set StatusCombiner on replication table", e);
-        return false;
-      }
-    }
-
-    Map<String,Set<Text>> localityGroups;
-    try {
-      localityGroups = tops.getLocalityGroups(ReplicationTable.NAME);
-    } catch (TableNotFoundException | AccumuloException e) {
-      log.error("Could not fetch locality groups", e);
-      return false;
-    }
-
-    Set<Text> statusColfams = 
localityGroups.get(ReplicationTable.STATUS_LG_NAME), workColfams = 
localityGroups.get(ReplicationTable.WORK_LG_NAME);
-    if (null == statusColfams || null == workColfams) {
-      try {
-        tops.setLocalityGroups(ReplicationTable.NAME, 
ReplicationTable.LOCALITY_GROUPS);
-      } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
-        log.error("Could not set locality groups on replication table", e);
-        return false;
-      }
-    }
-
-    // Make sure the StatusFormatter is set on the metadata table
-    Iterable<Entry<String,String>> properties;
-    try {
-      properties = tops.getProperties(ReplicationTable.NAME);
-    } catch (AccumuloException | TableNotFoundException e) {
-      log.error("Could not fetch table properties on replication table", e);
-      return false;
-    }
-
-    boolean formatterConfigured = false;
-    for (Entry<String,String> property : properties) {
-      if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
-        if 
(!ReplicationTable.STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
-          log.info("Changing formatter for {} table from {} to {}", 
ReplicationTable.NAME, property.getValue(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
-          try {
-            tops.setProperty(ReplicationTable.NAME, 
Property.TABLE_FORMATTER_CLASS.getKey(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
-          } catch (AccumuloException | AccumuloSecurityException e) {
-            log.error("Could not set formatter on replication table", e);
-            return false;
-          }
-        }
-
-        formatterConfigured = true;
-
-        // Don't need to keep iterating over the properties after we found the 
one we were looking for
-        break;
-      }
-    }
-
-    if (!formatterConfigured) {
-      try {
-        tops.setProperty(ReplicationTable.NAME, 
Property.TABLE_FORMATTER_CLASS.getKey(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        log.error("Could not set formatter on replication table", e);
-        return false;
-      }
-    }
-
-    log.debug("Successfully configured replication table");
-
-    return true;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index fa90b3e..71dcdcf 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.master.thrift.FateOperation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.NamespacePermission;
@@ -296,7 +297,7 @@ public class SecurityOperation {
   protected boolean _hasTablePermission(String user, String table, 
TablePermission permission, boolean useCached) throws ThriftSecurityException {
     targetUserExists(user);
 
-    if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID)) && 
permission.equals(TablePermission.READ))
+    if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID) || 
table.equals(ReplicationTable.ID)) && permission.equals(TablePermission.READ))
       return true;
 
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 8f84169..903142b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -17,6 +17,9 @@
 package org.apache.accumulo.server.util;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,6 +67,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -82,6 +86,7 @@ import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
@@ -132,10 +137,10 @@ public class MetadataTableUtil {
 
   public static void update(Credentials credentials, ZooLock zooLock, Mutation 
m, KeyExtent extent) {
     Writer t = extent.isMeta() ? getRootTable(credentials) : 
getMetadataTable(credentials);
-    update(t, credentials, zooLock, m);
+    update(t, zooLock, m);
   }
 
-  public static void update(Writer t, Credentials credentials, ZooLock 
zooLock, Mutation m) {
+  public static void update(Writer t, ZooLock zooLock, Mutation m) {
     if (zooLock != null)
       putLockID(zooLock, m);
     while (true) {
@@ -971,6 +976,20 @@ public class MetadataTableUtil {
   }
 
   /**
+   * During an upgrade from 1.6 to 1.7, we need to add the replication table
+   */
+  public static void createReplicationTable(Instance instance, 
SystemCredentials systemCredentials) throws IOException {
+    String dir = VolumeManagerImpl.get().choose(ServerConstants.getBaseUris()) 
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID
+        + Constants.DEFAULT_TABLET_LOCATION;
+
+    Mutation m = new Mutation(new Text(KeyExtent.getMetadataEntry(new 
Text(ReplicationTable.ID), null)));
+    m.put(DIRECTORY_COLUMN.getColumnFamily(), 
DIRECTORY_COLUMN.getColumnQualifier(), 0, new Value(dir.getBytes(UTF_8)));
+    m.put(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier(), 0, 
new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8)));
+    m.put(PREV_ROW_COLUMN.getColumnFamily(), 
PREV_ROW_COLUMN.getColumnQualifier(), 0, KeyExtent.encodePrevEndRow(null));
+    update(getMetadataTable(systemCredentials), null, m);
+  }
+
+  /**
    * During an upgrade we need to move deletion requests for files under the 
!METADATA table to the root tablet.
    */
   public static void moveMetaDeleteMarkers(Instance instance, Credentials 
creds) {
@@ -988,7 +1007,6 @@ public class MetadataTableUtil {
         break;
       }
     }
-
   }
 
   public static void moveMetaDeleteMarkersFrom14(Instance instance, 
Credentials creds) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
deleted file mode 100644
index 44ac3fe..0000000
--- 
a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.replication;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.IteratorSetting.Column;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.iterators.Combiner;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * 
- */
-public class ReplicationTableTest {
-
-  @Rule
-  public TestName testName = new TestName();
-
-  private MockInstance instance;
-  private Connector conn;
-
-  @Before
-  public void setupMockAccumulo() throws Exception {
-    instance = new MockInstance(testName.getMethodName());
-    conn = instance.getConnector("root", new PasswordToken(""));
-  }
-
-  @Test
-  public void replicationTableCreated() {
-    TableOperations tops = conn.tableOperations();
-    Assert.assertFalse(tops.exists(ReplicationTable.NAME));
-    ReplicationUtil.createReplicationTable(conn);
-    Assert.assertTrue(tops.exists(ReplicationTable.NAME));
-  }
-
-  @Test
-  public void replicationTableDoubleCreate() {
-    TableOperations tops = conn.tableOperations();
-    Assert.assertFalse(tops.exists(ReplicationTable.NAME));
-
-    // Create the table
-    ReplicationUtil.createReplicationTable(conn);
-
-    // Make sure it exists and save off the id
-    Assert.assertTrue(tops.exists(ReplicationTable.NAME));
-    String tableId = tops.tableIdMap().get(ReplicationTable.NAME);
-
-    // Try to make it again, should return quickly
-    ReplicationUtil.createReplicationTable(conn);
-    Assert.assertTrue(tops.exists(ReplicationTable.NAME));
-
-    // Verify we have the same table as previously
-    Assert.assertEquals(tableId, tops.tableIdMap().get(ReplicationTable.NAME));
-  }
-
-  @Test
-  public void configureOnExistingTable() throws Exception {
-    TableOperations tops = conn.tableOperations();
-
-    // Create the table by hand
-    tops.create(ReplicationTable.NAME);
-    Map<String,EnumSet<IteratorScope>> iterators = 
tops.listIterators(ReplicationTable.NAME);
-
-    Assert.assertFalse(iterators.containsKey(ReplicationTable.COMBINER_NAME));
-
-    ReplicationUtil.configureReplicationTable(conn);
-
-    // After configure the iterator should be set
-    iterators = tops.listIterators(ReplicationTable.NAME);
-    Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME));
-
-    // Needs to be set below versioning
-    IteratorSetting expected = new IteratorSetting(30, 
ReplicationTable.COMBINER_NAME, StatusCombiner.class);
-    Combiner.setColumns(expected, Arrays.asList(new 
Column(StatusSection.NAME), new Column(WorkSection.NAME)));
-    IteratorSetting setting = tops.getIteratorSetting(ReplicationTable.NAME, 
ReplicationTable.COMBINER_NAME, IteratorScope.scan);
-
-    Assert.assertEquals(expected, setting);
-
-    // Check for locality groups too
-    Map<String,Set<Text>> expectedLocalityGroups = 
ImmutableMap.of(ReplicationTable.STATUS_LG_NAME, 
ReplicationTable.STATUS_LG_COLFAMS,
-        ReplicationTable.WORK_LG_NAME, ReplicationTable.WORK_LG_COLFAMS);
-    Assert.assertEquals(expectedLocalityGroups, 
tops.getLocalityGroups(ReplicationTable.NAME));
-  }
-
-  @Test
-  public void disablesVersioning() throws Exception {
-    TableOperations tops = conn.tableOperations();
-
-    if (tops.exists(ReplicationTable.NAME)) {
-      tops.delete(ReplicationTable.NAME);
-    }
-
-    ReplicationUtil.createReplicationTable(conn);
-
-    Set<String> iters = tops.listIterators(ReplicationTable.NAME).keySet();
-
-    Assert.assertEquals(Collections.singleton(ReplicationTable.COMBINER_NAME), 
iters);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 9928d3c..4c0a3a9 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -46,6 +46,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -86,7 +87,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Creates a new GC WAL object.
-   * 
+   *
    * @param instance
    *          instance to use
    * @param fs
@@ -102,7 +103,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Gets the instance used by this object.
-   * 
+   *
    * @return instance
    */
   Instance getInstance() {
@@ -111,7 +112,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Gets the volume manager used by this object.
-   * 
+   *
    * @return volume manager
    */
   VolumeManager getVolumeManager() {
@@ -120,7 +121,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Checks if the volume manager should move files to the trash rather than 
delete them.
-   * 
+   *
    * @return true if trash is used
    */
   boolean isUsingTrash() {
@@ -277,7 +278,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Converts a list of paths to their corresponding strings.
-   * 
+   *
    * @param paths
    *          list of paths
    * @return string forms of paths
@@ -292,7 +293,7 @@ public class GarbageCollectWriteAheadLogs {
   /**
    * Reverses the given mapping of file paths to servers. The returned map 
provides a list of file paths for each server. Any path whose name is not in the
    * mapping of file names to paths is skipped.
-   * 
+   *
    * @param fileToServerMap
    *          map of file paths to servers
    * @param nameToFileMap
@@ -379,7 +380,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Determine if the given WAL is needed for replication
-   * 
+   *
    * @param wal
    *          The full path (URI)
    * @return True if the WAL is still needed by replication (not a candidate 
for deletion)
@@ -447,7 +448,7 @@ public class GarbageCollectWriteAheadLogs {
       replScanner.setRange(Range.exact(wal));
 
       return Iterables.concat(metaScanner, replScanner);
-    } catch (TableNotFoundException e) {
+    } catch (ReplicationTableOfflineException e) {
       // do nothing
     }
 
@@ -458,15 +459,15 @@ public class GarbageCollectWriteAheadLogs {
     return scanServers(ServerConstants.getWalDirs(), fileToServerMap, 
nameToFileMap);
   }
 
-  // TODO Remove deprecation warning suppression when Hadoop1 support is 
dropped
-  @SuppressWarnings("deprecation")
   /**
-   * Scans write-ahead log directories for logs. The maps passed in are
-   * populated with scan information.
+   * Scans write-ahead log directories for logs. The maps passed in are 
populated with scan information.
    *
-   * @param walDirs write-ahead log directories
-   * @param fileToServerMap map of file paths to servers
-   * @param nameToFileMap map of file names to paths
+   * @param walDirs
+   *          write-ahead log directories
+   * @param fileToServerMap
+   *          map of file paths to servers
+   * @param nameToFileMap
+   *          map of file names to paths
    * @return number of servers located (including those with no logs present)
    */
   int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, 
Map<String,Path> nameToFileMap) throws Exception {
@@ -484,7 +485,10 @@ public class GarbageCollectWriteAheadLogs {
         continue;
       for (FileStatus status : listing) {
         String server = status.getPath().getName();
-        if (status.isDir()) {
+        // TODO Remove deprecation warning suppression when Hadoop1 support is 
dropped
+        @SuppressWarnings("deprecation")
+        boolean isDirectory = status.isDir();
+        if (isDirectory) {
           servers.add(server);
           for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
             if (isUUID(file.getPath().getName())) {
@@ -513,7 +517,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Looks for write-ahead logs in recovery directories.
-   * 
+   *
    * @param recoveryDirs
    *          recovery directories
    * @return map of log file names to paths
@@ -540,7 +544,7 @@ public class GarbageCollectWriteAheadLogs {
 
   /**
    * Checks if a string is a valid UUID.
-   * 
+   *
    * @param name
    *          string to check
    * @return true if string is a UUID

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index bbb2010..01fd2c8 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -63,6 +63,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
@@ -537,7 +538,7 @@ public class SimpleGarbageCollector implements Iface {
           }
 
         });
-      } catch (TableNotFoundException e) {
+      } catch (ReplicationTableOfflineException e) {
         // No elements that we need to preclude
         return Iterators.emptyIterator();
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index d13edf7..1c94253 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -91,8 +91,8 @@ public class CloseWriteAheadLogReferences implements Runnable 
{
       throw new RuntimeException(e);
     }
 
-    if (!conn.tableOperations().exists(ReplicationTable.NAME)) {
-      log.debug("Replication table doesn't exist, not attempting to clean up 
wals");
+    if (!ReplicationTable.isOnline(conn)) {
+      log.debug("Replication table isn't online, not attempting to clean up 
wals");
       return;
     }
 
@@ -124,7 +124,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
 
   /**
    * Construct the set of referenced WALs from the metadata table
-   * 
+   *
    * @param conn
    *          Connector
    * @return The Set of WALs that are referenced in the metadata table
@@ -145,7 +145,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
     BatchScanner bs = null;
     try {
       // TODO Configurable number of threads
-      bs = conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 
4);
+      bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 
4);
       bs.setRanges(Collections.singleton(TabletsSection.getRange()));
       bs.fetchColumnFamily(LogColumnFamily.NAME);
 
@@ -176,7 +176,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
 
   /**
    * Given the set of WALs which have references in the metadata table, close 
any status messages with reference that WAL.
-   * 
+   *
    * @param conn
    *          Connector
    * @param referencedWals
@@ -188,7 +188,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
     long recordsClosed = 0;
     try {
       bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-      bs = conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 
4);
+      bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 
4);
       
bs.setRanges(Collections.singleton(Range.prefix(ReplicationSection.getRowPrefix())));
       bs.fetchColumnFamily(ReplicationSection.COLF);
 
@@ -239,7 +239,7 @@ public class CloseWriteAheadLogReferences implements 
Runnable {
 
   /**
    * Write a closed {@link Status} mutation for the given {@link Key} using 
the provided {@link BatchWriter}
-   * 
+   *
    * @param bw
    *          BatchWriter
    * @param k

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index cd69efb..71e5f7d 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -55,7 +55,6 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -384,11 +383,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
 
-    ReplicationUtil.createReplicationTable(conn);
-
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     Mutation m = new Mutation("/wals/" + file1);
     StatusSection.add(m, new Text("1"), 
StatusUtil.fileCreatedValue(file1CreateTime));
     bw.addMutation(m);
@@ -428,8 +425,6 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
 
-    ReplicationUtil.createReplicationTable(conn);
-
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
     // Write some records to the metadata table, we haven't yet written status 
records to the replication table
@@ -488,8 +483,6 @@ public class GarbageCollectWriteAheadLogsTest {
   public void fetchesReplicationEntriesFromMetadataAndReplicationTables() 
throws Exception {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    ReplicationUtil.createReplicationTable(conn);
-
     long walCreateTime = System.currentTimeMillis();
     String wal = 
"hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new 
BatchWriterConfig());

Reply via email to