keith-turner closed pull request #458: fixes #449 fix two bugs with WAL recovery
URL: https://github.com/apache/accumulo/pull/458
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a0c2a815ce..f87841db38 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3276,12 +3276,12 @@ public int compare(LogEntry e1, LogEntry e2) {
     logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, 
mutationReceiver);
   }
 
-  public int createLogId(KeyExtent tablet) {
-    AccumuloConfiguration acuTableConf = getTableConfiguration(tablet);
-    if (DurabilityImpl.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) 
!= Durability.NONE) {
-      return logIdGenerator.incrementAndGet();
+  public int createLogId() {
+    int logId = logIdGenerator.incrementAndGet();
+    if (logId < 0) {
+      throw new IllegalStateException("Log Id rolled");
     }
-    return -1;
+    return logId;
   }
 
   public TableConfiguration getTableConfiguration(KeyExtent extent) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
new file mode 100644
index 0000000000..59623dc38d
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+  @Override
+  public void close() throws IOException;
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 6aa4964af6..263f0c8d26 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -601,7 +601,7 @@ public synchronized void defineTablet(long seq, int tid, 
KeyExtent tablet) throw
     final LogFileKey key = new LogFileKey();
     key.event = DEFINE_TABLET;
     key.seq = seq;
-    key.tid = tid;
+    key.tabletId = tid;
     key.tablet = tablet;
     try {
       write(key, EMPTY);
@@ -662,7 +662,7 @@ public LoggerOperation logManyTablets(List<TabletMutations> 
mutations) throws IO
       LogFileKey key = new LogFileKey();
       key.event = MANY_MUTATIONS;
       key.seq = tabletMutations.getSeq();
-      key.tid = tabletMutations.getTid();
+      key.tabletId = tabletMutations.getTid();
       LogFileValue value = new LogFileValue();
       value.mutations = tabletMutations.getMutations();
       data.add(new Pair<>(key, value));
@@ -688,7 +688,7 @@ public LoggerOperation minorCompactionFinished(long seq, 
int tid, String fqfn,
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_FINISH;
     key.seq = seq;
-    key.tid = tid;
+    key.tabletId = tid;
     return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), 
durability);
   }
 
@@ -697,7 +697,7 @@ public LoggerOperation minorCompactionStarted(long seq, int 
tid, String fqfn,
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_START;
     key.seq = seq;
-    key.tid = tid;
+    key.tabletId = tid;
     key.filename = fqfn;
     return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), 
durability);
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
similarity index 52%
rename from 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
rename to 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
index e354797af8..718fa982d1 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
@@ -18,10 +18,16 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.Objects;
 
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.commons.collections.buffer.PriorityBuffer;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,23 +38,26 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
 /**
- * Provide simple Map.Reader methods over multiple Maps.
+ * A class which reads sorted recovery logs produced from a single WAL.
  *
  * Presently only supports next() and seek() and works on all the Map 
directories within a
  * directory. The primary purpose of this class is to merge the results of 
multiple Reduce jobs that
  * result in Map output files.
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class MultiReader {
+public class RecoveryLogReader implements 
CloseableIterator<Entry<LogFileKey,LogFileValue>> {
 
   /**
    * Group together the next key/value from a Reader with the Reader
-   *
    */
   private static class Index implements Comparable<Index> {
     Reader reader;
-    WritableComparable key;
+    WritableComparable<?> key;
     Writable value;
     boolean cached = false;
 
@@ -62,7 +71,7 @@ private static Object create(java.lang.Class<?> klass) {
 
     public Index(Reader reader) {
       this.reader = reader;
-      key = (WritableComparable) create(reader.getKeyClass());
+      key = (WritableComparable<?>) create(reader.getKeyClass());
       value = (Writable) create(reader.getValueClass());
     }
 
@@ -92,7 +101,9 @@ public int compareTo(Index o) {
           return 1;
         if (!o.cached)
           return -1;
-        return key.compareTo(o.key);
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        int result = ((WritableComparable) key).compareTo(o.key);
+        return result;
       } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
@@ -100,8 +111,14 @@ public int compareTo(Index o) {
   }
 
   private PriorityBuffer heap = new PriorityBuffer();
+  private Iterator<Entry<LogFileKey,LogFileValue>> iter;
+
+  public RecoveryLogReader(VolumeManager fs, Path directory) throws 
IOException {
+    this(fs, directory, null, null);
+  }
 
-  public MultiReader(VolumeManager fs, Path directory) throws IOException {
+  public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, 
LogFileKey end)
+      throws IOException {
     boolean foundFinish = false;
     for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
@@ -116,6 +133,8 @@ public MultiReader(VolumeManager fs, Path directory) throws 
IOException {
     if (!foundFinish)
       throw new IOException(
           "Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found 
in " + directory);
+
+    iter = new SortCheckIterator(new RangeIterator(start, end));
   }
 
   private static void copy(Writable src, Writable dest) throws IOException {
@@ -127,7 +146,8 @@ private static void copy(Writable src, Writable dest) 
throws IOException {
     dest.readFields(input);
   }
 
-  public synchronized boolean next(WritableComparable key, Writable val) 
throws IOException {
+  @VisibleForTesting
+  synchronized boolean next(WritableComparable<?> key, Writable val) throws 
IOException {
     Index elt = (Index) heap.remove();
     try {
       elt.cache();
@@ -144,13 +164,14 @@ public synchronized boolean next(WritableComparable key, 
Writable val) throws IO
     return true;
   }
 
-  public synchronized boolean seek(WritableComparable key) throws IOException {
+  @VisibleForTesting
+  synchronized boolean seek(WritableComparable<?> key) throws IOException {
     PriorityBuffer reheap = new PriorityBuffer(heap.size());
     boolean result = false;
     for (Object obj : heap) {
       Index index = (Index) obj;
       try {
-        WritableComparable found = index.reader.getClosest(key, index.value, 
true);
+        WritableComparable<?> found = index.reader.getClosest(key, 
index.value, true);
         if (found != null && found.equals(key)) {
           result = true;
         }
@@ -164,6 +185,7 @@ public synchronized boolean seek(WritableComparable key) 
throws IOException {
     return result;
   }
 
+  @Override
   public void close() throws IOException {
     IOException problem = null;
     for (Object obj : heap) {
@@ -179,4 +201,122 @@ public void close() throws IOException {
     heap = null;
   }
 
+  /**
+   * Ensures source iterator provides data in sorted order
+   */
+  @VisibleForTesting
+  static class SortCheckIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
+
+    SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
+      this.source = Iterators.peekingIterator(source);
+
+    }
+
+    @Override
+    public boolean hasNext() {
+      return source.hasNext();
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Entry<LogFileKey,LogFileValue> next = source.next();
+      if (source.hasNext()) {
+        
Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
+            "Keys not in order %s %s", next.getKey(), source.peek().getKey());
+      }
+      return next;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove");
+    }
+  }
+
+  private class RangeIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private LogFileKey key = new LogFileKey();
+    private LogFileValue value = new LogFileValue();
+    private boolean hasNext;
+    private LogFileKey end;
+
+    private boolean next(LogFileKey key, LogFileValue value) throws 
IOException {
+      try {
+        return RecoveryLogReader.this.next(key, value);
+      } catch (EOFException e) {
+        return false;
+      }
+    }
+
+    RangeIterator(LogFileKey start, LogFileKey end) throws IOException {
+      this.end = end;
+
+      if (start != null) {
+        hasNext = next(key, value);
+
+        if (hasNext && key.event != LogEvents.OPEN) {
+          throw new IllegalStateException("First log entry value is not OPEN");
+        }
+
+        seek(start);
+      }
+
+      hasNext = next(key, value);
+
+      if (hasNext && start != null && key.compareTo(start) < 0) {
+        throw new IllegalStateException("First key is less than start " + key 
+ " " + start);
+      }
+
+      if (hasNext && end != null && key.compareTo(end) > 0) {
+        hasNext = false;
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Preconditions.checkState(hasNext);
+      Entry<LogFileKey,LogFileValue> entry = new 
AbstractMap.SimpleImmutableEntry<>(key, value);
+
+      key = new LogFileKey();
+      value = new LogFileValue();
+      try {
+        hasNext = next(key, value);
+        if (hasNext && end != null && key.compareTo(end) > 0) {
+          hasNext = false;
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iter.hasNext();
+  }
+
+  @Override
+  public Entry<LogFileKey,LogFileValue> next() {
+    return iter.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("remove");
+  }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
new file mode 100644
index 0000000000..a68a4ec73b
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Iterates over multiple sorted recovery logs merging them into a single 
sorted stream.
+ */
+public class RecoveryLogsIterator implements 
CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecoveryLogsIterator.class);
+
+  List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
+  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+
+  /**
+   * Iterates only over keys in the range [start,end].
+   */
+  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, 
LogFileKey start,
+      LogFileKey end) throws IOException {
+
+    iterators = new ArrayList<>(recoveryLogPaths.size());
+
+    try {
+      for (Path log : recoveryLogPaths) {
+        iterators.add(new RecoveryLogReader(fs, log, start, end));
+      }
+
+      iter = Iterators.mergeSorted(iterators, new 
Comparator<Entry<LogFileKey,LogFileValue>>() {
+        @Override
+        public int compare(Entry<LogFileKey,LogFileValue> o1, 
Entry<LogFileKey,LogFileValue> o2) {
+          return o1.getKey().compareTo(o2.getKey());
+        }
+      });
+
+    } catch (RuntimeException | IOException e) {
+      try {
+        close();
+      } catch (Exception e2) {
+        e.addSuppressed(e2);
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iter.hasNext();
+  }
+
+  @Override
+  public Entry<LogFileKey,LogFileValue> next() {
+    return iter.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("remove");
+  }
+
+  @Override
+  public void close() {
+    for (CloseableIterator<?> reader : iterators) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        LOG.debug("Failed to close reader", e);
+      }
+    }
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index a850d2fd79..ebfa92f08a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -16,50 +16,44 @@
  */
 package org.apache.accumulo.tserver.log;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
-import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
 /**
  * Extract Mutations for a tablet from a set of logs that have been sorted by 
operation and tablet.
  *
  */
 public class SortedLogRecovery {
-  private static final Logger log = 
LoggerFactory.getLogger(SortedLogRecovery.class);
-
-  static class EmptyMapFileException extends Exception {
-    private static final long serialVersionUID = 1L;
 
-    public EmptyMapFileException() {
-      super();
-    }
-  }
-
-  static class UnusedException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public UnusedException() {
-      super();
-    }
-  }
+  private static final Logger log = 
LoggerFactory.getLogger(SortedLogRecovery.class);
 
   private VolumeManager fs;
 
@@ -67,81 +61,65 @@ public SortedLogRecovery(VolumeManager fs) {
     this.fs = fs;
   }
 
-  private enum Status {
-    INITIAL, LOOKING_FOR_FINISH, COMPLETE
+  static LogFileKey maxKey(LogEvents event) {
+    LogFileKey key = new LogFileKey();
+    key.event = event;
+    key.tabletId = Integer.MAX_VALUE;
+    key.seq = Long.MAX_VALUE;
+    return key;
   }
 
-  private static class LastStartToFinish {
-    long lastStart = -1;
-    long seq = -1;
-    long lastFinish = -1;
-    Status compactionStatus = Status.INITIAL;
-    String tserverSession = "";
-
-    private void update(long newFinish) {
-      this.seq = this.lastStart;
-      if (newFinish != -1)
-        lastFinish = newFinish;
-    }
+  static LogFileKey maxKey(LogEvents event, int tabletId) {
+    LogFileKey key = maxKey(event);
+    key.tabletId = tabletId;
+    return key;
+  }
 
-    private void update(int newStartFile, long newStart) {
-      this.lastStart = newStart;
-    }
+  static LogFileKey minKey(LogEvents event) {
+    LogFileKey key = new LogFileKey();
+    key.event = event;
+    // see GitHub issue #477. There was a bug that caused -1 to end up in 
tabletId. If this happens
+    // want to detect it and fail since recovery is dubious in this situation 
. Other code should
+    // fail if the id is actually -1 in data.
+    key.tabletId = -1;
+    key.seq = 0;
+    return key;
+  }
 
-    private void update(String newSession) {
-      this.lastStart = -1;
-      this.lastFinish = -1;
-      this.compactionStatus = Status.INITIAL;
-      this.tserverSession = newSession;
-    }
+  static LogFileKey minKey(LogEvents event, int tabletId) {
+    LogFileKey key = minKey(event);
+    key.tabletId = tabletId;
+    return key;
   }
 
-  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> 
tabletFiles,
-      MutationReceiver mr) throws IOException {
-    int[] tids = new int[recoveryLogs.size()];
-    LastStartToFinish lastStartToFinish = new LastStartToFinish();
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        try {
-          tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, 
lastStartToFinish);
-        } catch (EmptyMapFileException ex) {
-          log.info("Ignoring empty map file " + logfile);
-          tids[i] = -1;
-        } catch (UnusedException ex) {
-          log.info("Ignoring log file " + logfile + " appears to be unused by 
" + extent);
-          tids[i] = -1;
-        }
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
-        }
+  private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) 
throws IOException {
+    int tabletId = -1;
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
+        minKey(DEFINE_TABLET), maxKey(DEFINE_TABLET))) {
+
+      KeyExtent alternative = extent;
+      if (extent.isRootTablet()) {
+        alternative = RootTable.OLD_EXTENT;
       }
 
-    }
+      while (rli.hasNext()) {
+        LogFileKey key = rli.next().getKey();
+
+        checkState(key.event == DEFINE_TABLET); // should only fail if bug 
elsewhere
 
-    if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-      throw new RuntimeException("COMPACTION_FINISH (without preceding"
-          + " COMPACTION_START) not followed by successful minor compaction");
-
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        playbackMutations(reader, tids[i], lastStartToFinish, mr);
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
+        if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
+          checkState(key.tabletId >= 0, "tabletId %s for %s is negative", 
key.tabletId, extent);
+          checkState(tabletId == -1 || key.tabletId >= tabletId); // should 
only fail if bug in
+          // RecoveryLogsIterator
+
+          if (tabletId != key.tabletId) {
+            tabletId = key.tabletId;
+          }
         }
       }
-      log.info("Recovery complete for " + extent + " using " + logfile);
     }
+    return tabletId;
   }
 
   private String getPathSuffix(String pathString) {
@@ -151,126 +129,155 @@ private String getPathSuffix(String pathString) {
     return path.getParent().getName() + "/" + path.getName();
   }
 
-  int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent,
-      Set<String> tabletFiles, LastStartToFinish lastStartToFinish)
-      throws IOException, EmptyMapFileException, UnusedException {
+  static class DeduplicatingIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
+
+    public DeduplicatingIterator(Iterator<Entry<LogFileKey,LogFileValue>> 
source) {
+      this.source = Iterators.peekingIterator(source);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return source.hasNext();
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Entry<LogFileKey,LogFileValue> next = source.next();
+
+      while (source.hasNext() && 
next.getKey().compareTo(source.peek().getKey()) == 0) {
+        source.next();
+      }
+
+      return next;
+    }
 
+  }
+
+  private long findLastStartToFinish(List<Path> recoveryLogs, Set<String> 
tabletFiles, int tabletId)
+      throws IOException {
     HashSet<String> suffixes = new HashSet<>();
     for (String path : tabletFiles)
       suffixes.add(getPathSuffix(path));
 
-    // Scan for tableId for this extent (should always be in the log)
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-    int tid = -1;
-    if (!reader.next(key, value))
-      throw new EmptyMapFileException();
-    if (key.event != OPEN)
-      throw new RuntimeException("First log entry value is not OPEN");
-
-    if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) {
-      if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-        throw new RuntimeException("COMPACTION_FINISH (without preceding"
-            + " COMPACTION_START) is not followed by a successful minor 
compaction.");
-      lastStartToFinish.update(key.tserverSession);
-    }
-    KeyExtent alternative = extent;
-    if (extent.isRootTablet()) {
-      alternative = RootTable.OLD_EXTENT;
-    }
+    long lastStart = 0;
+    long recoverySeq = 0;
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs,
+        minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, 
tabletId))) {
+
+      DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
+
+      String lastStartFile = null;
+      LogEvents lastEvent = null;
+      boolean firstEventWasFinish = false;
+      boolean sawStartFinish = false;
+
+      while (ddi.hasNext()) {
+        LogFileKey key = ddi.next().getKey();
 
-    LogFileKey defineKey = null;
-
-    // find the maximum tablet id... because a tablet may leave a tserver and 
then come back, in
-    // which case it would have a different tablet id
-    // for the maximum tablet id, find the minimum sequence #... may be ok to 
find the max seq, but
-    // just want to make the code behave like it used to
-    while (reader.next(key, value)) {
-      // log.debug("Event " + key.event + " tablet " + key.tablet);
-      if (key.event != DEFINE_TABLET)
-        break;
-      if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
-        if (tid != key.tid) {
-          tid = key.tid;
-          defineKey = key;
-          key = new LogFileKey();
+        checkState(key.seq >= 0, "Unexpected negative seq %s for tabletId %s", 
key.seq, tabletId);
+        checkState(key.tabletId == tabletId); // should only fail if bug 
elsewhere
+
+        if (key.event == COMPACTION_START) {
+          checkState(key.seq >= lastStart); // should only fail if bug 
elsewhere
+          lastStart = key.seq;
+          lastStartFile = key.filename;
+        } else if (key.event == COMPACTION_FINISH) {
+          if (lastEvent == null) {
+            firstEventWasFinish = true;
+          } else if (lastEvent == COMPACTION_FINISH) {
+            throw new IllegalStateException(
+                "Saw consecutive COMPACTION_FINISH events " + key.tabletId + " 
" + key.seq);
+          } else {
+            if (key.seq <= lastStart) {
+              throw new IllegalStateException(
+                  "Compaction finish <= start " + lastStart + " " + key.seq);
+            }
+            recoverySeq = lastStart;
+            lastStartFile = null;
+            sawStartFinish = true;
+          }
+        } else {
+          throw new IllegalStateException("Non compaction event seen " + 
key.event);
         }
+
+        lastEvent = key.event;
       }
-    }
-    if (tid < 0) {
-      throw new UnusedException();
-    }
 
-    log.debug("Found tid, seq " + tid + " " + defineKey.seq);
-
-    // Scan start/stop events for this tablet
-    key = defineKey;
-    key.event = COMPACTION_START;
-    reader.seek(key);
-    while (reader.next(key, value)) {
-      // LogFileEntry.printEntry(entry);
-      if (key.tid != tid)
-        break;
-      if (key.event == COMPACTION_START) {
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for 
start/stop events: "
-              + key.seq + " vs " + lastStartToFinish.lastStart);
-        lastStartToFinish.update(fileno, key.seq);
-
-        // Tablet server finished the minor compaction, but didn't remove the 
entry from the
-        // METADATA table.
-        log.debug(
-            "minor compaction into " + key.filename + " finished, but was 
still in the METADATA");
-        if (suffixes.contains(getPathSuffix(key.filename)))
-          lastStartToFinish.update(-1);
-      } else if (key.event == COMPACTION_FINISH) {
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for 
start/stop events: "
-              + key.seq + " vs " + lastStartToFinish.lastStart);
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
-        else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
-          throw new RuntimeException(
-              "COMPACTION_FINISH does not have preceding COMPACTION_START 
event.");
-        else
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        lastStartToFinish.update(key.seq);
-      } else
-        break;
+      if (firstEventWasFinish && !sawStartFinish) {
+        throw new IllegalStateException("COMPACTION_FINISH (without preceding 
COMPACTION_START)"
+            + " is not followed by a successful minor compaction.");
+      }
+
+      if (lastStartFile != null && 
suffixes.contains(getPathSuffix(lastStartFile))) {
+        // There was no compaction finish event, however the last compaction 
start event has a file
+        // in the metadata table, so the compaction finished.
+        log.debug("Considering compaction start {} {} finished because file {} 
in metadata table",
+            tabletId, lastStart, getPathSuffix(lastStartFile));
+        recoverySeq = lastStart;
+      }
     }
-    return tid;
+    return recoverySeq;
   }
 
-  private void playbackMutations(MultiReader reader, int tid, 
LastStartToFinish lastStartToFinish,
-      MutationReceiver mr) throws IOException {
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-
-    // Playback mutations after the last stop to finish
-    log.info("Scanning for mutations starting at sequence number " + 
lastStartToFinish.seq
-        + " for tid " + tid);
-    key.event = MUTATION;
-    key.tid = tid;
-    // the seq number for the minor compaction start is now the same as the
-    // last update made to memory. Scan up to that mutation, but not past it.
-    key.seq = lastStartToFinish.seq;
-    reader.seek(key);
-    while (true) {
-      if (!reader.next(key, value))
-        break;
-      if (key.tid != tid)
-        break;
-      if (key.event == MUTATION) {
-        mr.receive(value.mutations.get(0));
-      } else if (key.event == MANY_MUTATIONS) {
-        for (Mutation m : value.mutations) {
-          mr.receive(m);
+  private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, 
int tabletId,
+      long recoverySeq) throws IOException {
+    LogFileKey start = minKey(MUTATION, tabletId);
+    start.seq = recoverySeq;
+
+    LogFileKey end = maxKey(MUTATION, tabletId);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, 
start, end)) {
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+
+        checkState(entry.getKey().tabletId == tabletId); // should only fail 
if bug elsewhere
+        checkState(entry.getKey().seq >= recoverySeq); // should only fail if 
bug elsewhere
+
+        if (entry.getKey().event == MUTATION) {
+          mr.receive(entry.getValue().mutations.get(0));
+        } else if (entry.getKey().event == MANY_MUTATIONS) {
+          for (Mutation m : entry.getValue().mutations) {
+            mr.receive(m);
+          }
+        } else {
+          throw new IllegalStateException("Non mutation event seen " + 
entry.getKey().event);
         }
-      } else {
-        throw new RuntimeException("unexpected log key type: " + key.event);
       }
     }
   }
+
+  Collection<String> asNames(List<Path> recoveryLogs) {
+    return Collections2.transform(recoveryLogs, new Function<Path,String>() {
+      @Override
+      public String apply(Path input) {
+        return input.getName();
+      }
+    });
+  }
+
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> 
tabletFiles,
+      MutationReceiver mr) throws IOException {
+
+    // A tablet may leave a tserver and then come back, in which case it would 
have a different and
+    // higher tablet id. Only want to consider events in the log related to 
the last time the tablet
+    // was loaded.
+    int tabletId = findMaxTabletId(extent, recoveryLogs);
+
+    if (tabletId == -1) {
+      log.info("Tablet {} is not defined in recovery logs {} ", extent, 
asNames(recoveryLogs));
+      return;
+    }
+
+    // Find the seq # for the last compaction that started and finished
+    long recoverySeq = findLastStartToFinish(recoveryLogs, tabletFiles, 
tabletId);
+
+    log.info("Recovering mutations, tablet:{} tabletId:{} seq:{} logs:{}", 
extent, tabletId,
+        recoverySeq, asNames(recoveryLogs));
+
+    // Replay all mutations that were written after the last successful 
compaction started.
+    playbackMutations(recoveryLogs, mr, tabletId, recoverySeq);
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
index 6cf9d08cac..7eb00cd6cd 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
@@ -34,7 +34,7 @@
   public String filename = null;
   public KeyExtent tablet = null;
   public long seq = -1;
-  public int tid = -1;
+  public int tabletId = -1;
   public static final int VERSION = 2;
   public String tserverSession;
 
@@ -48,35 +48,35 @@ public void readFields(DataInput in) throws IOException {
     event = LogEvents.values()[value];
     switch (event) {
       case OPEN:
-        tid = in.readInt();
+        tabletId = in.readInt();
         tserverSession = in.readUTF();
-        if (tid != VERSION) {
-          throw new RuntimeException(String
-              .format("Bad version number for log file: expected %d, but saw 
%d", VERSION, tid));
+        if (tabletId != VERSION) {
+          throw new RuntimeException(String.format(
+              "Bad version number for log file: expected %d, but saw %d", 
VERSION, tabletId));
         }
         break;
       case COMPACTION_FINISH:
         seq = in.readLong();
-        tid = in.readInt();
+        tabletId = in.readInt();
         break;
       case COMPACTION_START:
         seq = in.readLong();
-        tid = in.readInt();
+        tabletId = in.readInt();
         filename = in.readUTF();
         break;
       case DEFINE_TABLET:
         seq = in.readLong();
-        tid = in.readInt();
+        tabletId = in.readInt();
         tablet = new KeyExtent();
         tablet.readFields(in);
         break;
       case MANY_MUTATIONS:
         seq = in.readLong();
-        tid = in.readInt();
+        tabletId = in.readInt();
         break;
       case MUTATION:
         seq = in.readLong();
-        tid = in.readInt();
+        tabletId = in.readInt();
         break;
       default:
         throw new RuntimeException("Unknown log event type: " + event);
@@ -90,32 +90,32 @@ public void write(DataOutput out) throws IOException {
     switch (event) {
       case OPEN:
         seq = -1;
-        tid = -1;
+        tabletId = -1;
         out.writeInt(VERSION);
         out.writeUTF(tserverSession);
         // out.writeUTF(Accumulo.getInstanceID());
         break;
       case COMPACTION_FINISH:
         out.writeLong(seq);
-        out.writeInt(tid);
+        out.writeInt(tabletId);
         break;
       case COMPACTION_START:
         out.writeLong(seq);
-        out.writeInt(tid);
+        out.writeInt(tabletId);
         out.writeUTF(filename);
         break;
       case DEFINE_TABLET:
         out.writeLong(seq);
-        out.writeInt(tid);
+        out.writeInt(tabletId);
         tablet.write(out);
         break;
       case MANY_MUTATIONS:
         out.writeLong(seq);
-        out.writeInt(tid);
+        out.writeInt(tabletId);
         break;
       case MUTATION:
         out.writeLong(seq);
-        out.writeInt(tid);
+        out.writeInt(tabletId);
         break;
       default:
         throw new IllegalArgumentException("Bad value for LogFileEntry type");
@@ -151,8 +151,8 @@ public int compareTo(LogFileKey o) {
     }
     if (this.event == OPEN)
       return 0;
-    if (this.tid != o.tid) {
-      return this.tid - o.tid;
+    if (this.tabletId != o.tabletId) {
+      return this.tabletId - o.tabletId;
     }
     return sign(this.seq - o.seq);
   }
@@ -176,15 +176,15 @@ public String toString() {
       case OPEN:
         return String.format("OPEN %s", tserverSession);
       case COMPACTION_FINISH:
-        return String.format("COMPACTION_FINISH %d %d", tid, seq);
+        return String.format("COMPACTION_FINISH %d %d", tabletId, seq);
       case COMPACTION_START:
-        return String.format("COMPACTION_START %d %d %s", tid, seq, filename);
+        return String.format("COMPACTION_START %d %d %s", tabletId, seq, 
filename);
       case MUTATION:
-        return String.format("MUTATION %d %d", tid, seq);
+        return String.format("MUTATION %d %d", tabletId, seq);
       case MANY_MUTATIONS:
-        return String.format("MANY_MUTATIONS %d %d", tid, seq);
+        return String.format("MANY_MUTATIONS %d %d", tabletId, seq);
       case DEFINE_TABLET:
-        return String.format("DEFINE_TABLET %d %d %s", tid, seq, tablet);
+        return String.format("DEFINE_TABLET %d %d %s", tabletId, seq, tablet);
     }
     throw new RuntimeException("Unknown type of entry: " + event);
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 927e3458a1..ce465ced24 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,7 +38,7 @@
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.MultiReader;
+import org.apache.accumulo.tserver.log.RecoveryLogReader;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -129,9 +130,12 @@ public static void main(String[] args) throws IOException {
         }
       } else {
         // read the log entries sorted in a map file
-        MultiReader input = new MultiReader(fs, path);
-        while (input.next(key, value)) {
-          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
+        try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
+          while (input.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = input.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, 
ke, tabletIds,
+                opts.maxMutations);
+          }
         }
       }
     }
@@ -143,11 +147,11 @@ public static void printLogEvent(LogFileKey key, 
LogFileValue value, Text row, M
     if (ke != null) {
       if (key.event == LogEvents.DEFINE_TABLET) {
         if (key.tablet.equals(ke)) {
-          tabletIds.add(key.tid);
+          tabletIds.add(key.tabletId);
         } else {
           return;
         }
-      } else if (!tabletIds.contains(key.tid)) {
+      } else if (!tabletIds.contains(key.tabletId)) {
         return;
       }
     }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index c2505ff28c..613b13df83 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -720,7 +720,7 @@ protected RFileReplication getKeyValues(ReplicationTarget 
target, DataInputStrea
       switch (key.event) {
         case DEFINE_TABLET:
           if (target.getSourceTableId().equals(key.tablet.getTableId())) {
-            desiredTids.add(key.tid);
+            desiredTids.add(key.tabletId);
           }
           break;
         default:
@@ -772,13 +772,13 @@ protected WalReplication getWalEdits(ReplicationTarget 
target, DataInputStream w
         case DEFINE_TABLET:
           // For new DEFINE_TABLETs, we also need to record the new tids we see
           if (target.getSourceTableId().equals(key.tablet.getTableId())) {
-            desiredTids.add(key.tid);
+            desiredTids.add(key.tabletId);
           }
           break;
         case MUTATION:
         case MANY_MUTATIONS:
           // Only write out mutations for tids that are for the desired tablet
-          if (desiredTids.contains(key.tid)) {
+          if (desiredTids.contains(key.tabletId)) {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos);
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 18d8daf49c..1e0ffcc7b0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -330,7 +330,7 @@ public Tablet(final TabletServer tabletServer, final 
KeyExtent extent,
     this.splitCreationTime = data.getSplitTime();
     this.tabletTime = TabletTime.getInstance(data.getTime());
     this.persistedTime = tabletTime.getTime();
-    this.logId = tabletServer.createLogId(extent);
+    this.logId = tabletServer.createLogId();
 
     TableConfiguration tblConf = tabletServer.getTableConfiguration(extent);
     if (null == tblConf) {
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
new file mode 100644
index 0000000000..f3bc9049d1
--- /dev/null
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogEventsTest {
+  @Test
+  public void testOrdinals() {
+    // Ordinals are used for persistence, so its important they are stable.
+
+    LogEvents[] expectedOrder = new LogEvents[] {LogEvents.OPEN, 
LogEvents.DEFINE_TABLET,
+        LogEvents.MUTATION, LogEvents.MANY_MUTATIONS, 
LogEvents.COMPACTION_START,
+        LogEvents.COMPACTION_FINISH};
+
+    for (int i = 0; i < expectedOrder.length; i++) {
+      Assert.assertEquals(i, expectedOrder[i].ordinal());
+    }
+  }
+}
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java
new file mode 100644
index 0000000000..8965094694
--- /dev/null
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogFileKeyTest {
+
+  private static LogFileKey nk(LogEvents e, int tabletId, long seq) {
+    LogFileKey k = new LogFileKey();
+    k.event = e;
+    k.tabletId = tabletId;
+    k.seq = seq;
+    return k;
+  }
+
+  @Test
+  public void testEquivalence() {
+
+    LogFileKey start = nk(LogEvents.COMPACTION_START, 1, 3);
+    LogFileKey finish = nk(LogEvents.COMPACTION_FINISH, 1, 3);
+    LogFileKey mut = nk(LogEvents.MUTATION, 1, 3);
+    LogFileKey mmut = nk(LogEvents.MANY_MUTATIONS, 1, 3);
+
+    Assert.assertTrue(start.compareTo(finish) == 0);
+    Assert.assertTrue(finish.compareTo(start) == 0);
+
+    Assert.assertTrue(mut.compareTo(mmut) == 0);
+    Assert.assertTrue(mmut.compareTo(mut) == 0);
+  }
+
+  @Test
+  public void testSortOrder() {
+    List<LogFileKey> keys = new ArrayList<>();
+
+    // add keys in expected sort order
+    keys.add(nk(LogEvents.OPEN, 0, 0));
+
+    // there was a bug that was putting -1 in WAL for tabletId, ensure this 
data sorts as expected
+    keys.add(nk(LogEvents.DEFINE_TABLET, -1, 0));
+    keys.add(nk(LogEvents.DEFINE_TABLET, 3, 6));
+    keys.add(nk(LogEvents.DEFINE_TABLET, 3, 7));
+    keys.add(nk(LogEvents.DEFINE_TABLET, 4, 2));
+    keys.add(nk(LogEvents.DEFINE_TABLET, 4, 9));
+
+    keys.add(nk(LogEvents.COMPACTION_START, 3, 3));
+    keys.add(nk(LogEvents.COMPACTION_FINISH, 3, 5));
+    keys.add(nk(LogEvents.COMPACTION_START, 3, 7));
+    keys.add(nk(LogEvents.COMPACTION_FINISH, 3, 9));
+
+    keys.add(nk(LogEvents.COMPACTION_START, 4, 1));
+    keys.add(nk(LogEvents.COMPACTION_FINISH, 4, 3));
+    keys.add(nk(LogEvents.COMPACTION_START, 4, 11));
+    keys.add(nk(LogEvents.COMPACTION_FINISH, 4, 13));
+
+    keys.add(nk(LogEvents.MUTATION, 3, 1));
+    keys.add(nk(LogEvents.MUTATION, 3, 2));
+    keys.add(nk(LogEvents.MUTATION, 3, 3));
+    keys.add(nk(LogEvents.MUTATION, 3, 3));
+    keys.add(nk(LogEvents.MANY_MUTATIONS, 3, 11));
+
+    keys.add(nk(LogEvents.MANY_MUTATIONS, 4, 2));
+    keys.add(nk(LogEvents.MUTATION, 4, 3));
+    keys.add(nk(LogEvents.MUTATION, 4, 5));
+    keys.add(nk(LogEvents.MUTATION, 4, 7));
+    keys.add(nk(LogEvents.MANY_MUTATIONS, 4, 15));
+
+    for (int i = 0; i < 10; i++) {
+      List<LogFileKey> testList = new ArrayList<>(keys);
+      Collections.shuffle(testList);
+      Collections.sort(testList);
+
+      Assert.assertEquals(keys, testList);
+    }
+
+  }
+}
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
similarity index 77%
rename from 
server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
rename to 
server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
index a6df17cf56..37feafdb64 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/MultiReaderTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
@@ -22,9 +22,17 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.log.RecoveryLogReader.SortCheckIterator;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -35,7 +43,7 @@
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class MultiReaderTest {
+public class RecoveryLogsReaderTest {
 
   VolumeManager fs;
   TemporaryFolder root = new TemporaryFolder(new 
File(System.getProperty("user.dir") + "/target"));
@@ -73,7 +81,7 @@ public void tearDown() throws Exception {
     root.create();
   }
 
-  private void scan(MultiReader reader, int start) throws IOException {
+  private void scan(RecoveryLogReader reader, int start) throws IOException {
     IntWritable key = new IntWritable();
     BytesWritable value = new BytesWritable();
 
@@ -85,7 +93,7 @@ private void scan(MultiReader reader, int start) throws 
IOException {
     }
   }
 
-  private void scanOdd(MultiReader reader, int start) throws IOException {
+  private void scanOdd(RecoveryLogReader reader, int start) throws IOException 
{
     IntWritable key = new IntWritable();
     BytesWritable value = new BytesWritable();
 
@@ -98,7 +106,7 @@ private void scanOdd(MultiReader reader, int start) throws 
IOException {
   @Test
   public void testMultiReader() throws IOException {
     Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + 
"/manyMaps");
-    MultiReader reader = new MultiReader(fs, manyMaps);
+    RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps);
     IntWritable key = new IntWritable();
     BytesWritable value = new BytesWritable();
 
@@ -128,7 +136,7 @@ public void testMultiReader() throws IOException {
     reader.close();
 
     fs.deleteRecursively(new Path(manyMaps, "even"));
-    reader = new MultiReader(fs, manyMaps);
+    reader = new RecoveryLogReader(fs, manyMaps);
     key.set(501);
     assertTrue(reader.seek(key));
     scanOdd(reader, 501);
@@ -144,4 +152,29 @@ public void testMultiReader() throws IOException {
 
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testSortCheck() {
+
+    List<Entry<LogFileKey,LogFileValue>> unsorted = new ArrayList<>();
+
+    LogFileKey k1 = new LogFileKey();
+    k1.event = LogEvents.MANY_MUTATIONS;
+    k1.tabletId = 2;
+    k1.seq = 55;
+
+    LogFileKey k2 = new LogFileKey();
+    k2.event = LogEvents.MANY_MUTATIONS;
+    k2.tabletId = 9;
+    k2.seq = 9;
+
+    unsorted.add(new AbstractMap.SimpleEntry<>(k2, (LogFileValue) null));
+    unsorted.add(new AbstractMap.SimpleEntry<>(k1, (LogFileValue) null));
+
+    SortCheckIterator iter = new SortCheckIterator(unsorted.iterator());
+
+    while (iter.hasNext()) {
+      iter.next();
+    }
+  }
+
 }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 6053cb5f24..bdf5210564 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -51,7 +51,9 @@
 import org.apache.hadoop.io.MapFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
 public class SortedLogRecoveryTest {
@@ -61,6 +63,9 @@
   static final Text cq = new Text("cq");
   static final Value value = new Value("value".getBytes());
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   static class KeyValue implements Comparable<KeyValue> {
     public final LogFileKey key;
     public final LogFileValue value;
@@ -92,7 +97,7 @@ private static KeyValue createKeyValue(LogEvents type, long 
seq, int tid,
     KeyValue result = new KeyValue();
     result.key.event = type;
     result.key.seq = seq;
-    result.key.tid = tid;
+    result.key.tabletId = tid;
     switch (type) {
       case OPEN:
         result.key.tserverSession = (String) fileExtentMutation;
@@ -179,17 +184,17 @@ public void testCompactionCrossesLogs() throws 
IOException {
         createKeyValue(DEFINE_TABLET, 1, 1, extent),
         createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"), 
createKeyValue(MUTATION, 7, 1, m),};
     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, 2, "23"),
-        createKeyValue(DEFINE_TABLET, 1, 2, extent),
-        createKeyValue(COMPACTION_START, 5, 2, "/t1/f2"),
-        createKeyValue(COMPACTION_FINISH, 6, 2, null), 
createKeyValue(MUTATION, 3, 2, ignored),
-        createKeyValue(MUTATION, 4, 2, ignored),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 5, 1, "/t1/f2"),
+        createKeyValue(COMPACTION_FINISH, 6, 1, null), 
createKeyValue(MUTATION, 3, 1, ignored),
+        createKeyValue(MUTATION, 4, 1, ignored),};
     KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 0, 3, "69"),
-        createKeyValue(DEFINE_TABLET, 1, 3, extent), createKeyValue(MUTATION, 
2, 3, ignored),
-        createKeyValue(MUTATION, 3, 3, ignored), createKeyValue(MUTATION, 4, 
3, ignored),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 
2, 1, ignored),
+        createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 4, 
1, ignored),};
     KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, 4, "70"),
-        createKeyValue(DEFINE_TABLET, 1, 4, extent),
-        createKeyValue(COMPACTION_START, 3, 4, "/t1/f3"), 
createKeyValue(MUTATION, 2, 4, ignored),
-        createKeyValue(MUTATION, 6, 4, m2),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "/t1/f3"), 
createKeyValue(MUTATION, 2, 1, ignored),
+        createKeyValue(MUTATION, 6, 1, m2),};
 
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
@@ -203,8 +208,8 @@ public void testCompactionCrossesLogs() throws IOException {
 
     // Verify recovered data
     Assert.assertEquals(2, mutations.size());
-    Assert.assertEquals(m, mutations.get(0));
-    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertTrue(mutations.contains(m));
+    Assert.assertTrue(mutations.contains(m2));
   }
 
   @Test
@@ -545,7 +550,7 @@ public void testCompactionCrossesLogs4() throws IOException 
{
         // createKeyValue(COMPACTION_FINISH, 17, 1, null),
         // createKeyValue(COMPACTION_START, 18, 1, "somefile"),
         // createKeyValue(COMPACTION_FINISH, 19, 1, null),
-        createKeyValue(MUTATION, 8, 1, m5), createKeyValue(MUTATION, 20, 1, 
m6),};
+        createKeyValue(MUTATION, 9, 1, m5), createKeyValue(MUTATION, 20, 1, 
m6),};
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
     logs.put("entries2", entries2);
@@ -595,11 +600,11 @@ public void testLookingForBug3() throws IOException {
     List<Mutation> mutations = recover(logs, extent);
     // Verify recovered data
     Assert.assertEquals(5, mutations.size());
-    Assert.assertEquals(m, mutations.get(0));
-    Assert.assertEquals(m2, mutations.get(1));
-    Assert.assertEquals(m3, mutations.get(2));
-    Assert.assertEquals(m4, mutations.get(3));
-    Assert.assertEquals(m5, mutations.get(4));
+    Assert.assertTrue(mutations.contains(m));
+    Assert.assertTrue(mutations.contains(m2));
+    Assert.assertTrue(mutations.contains(m3));
+    Assert.assertTrue(mutations.contains(m4));
+    Assert.assertTrue(mutations.contains(m5));
   }
 
   @Test
@@ -674,6 +679,85 @@ public void testNoFinish1() throws Exception {
     Assert.assertEquals(m, mutations.get(0));
   }
 
+  @Test
+  public void testLeaveAndComeBack() throws IOException {
+    /**
+     * This test recreates the situation in bug #449 (Github issues).
+     */
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1),
+        createKeyValue(COMPACTION_START, 101, 10, "/t/f1"),
+        createKeyValue(COMPACTION_FINISH, 102, 10, null)};
+
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 1, 20, extent), createKeyValue(MUTATION, 
1, 20, m2)};
+
+    Arrays.sort(entries1);
+    Arrays.sort(entries2);
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+    logs.put("entries2", entries2);
+
+    List<Mutation> mutations = recover(logs, extent);
+
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m2, mutations.get(0));
+  }
+
+  @Test
+  public void testMultipleTablets() throws IOException {
+    KeyExtent e1 = new KeyExtent("1", new Text("m"), null);
+    KeyExtent e2 = new KeyExtent("1", null, new Text("m"));
+
+    Mutation m1 = new ServerMutation(new Text("b"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("b"));
+    m2.put("f1", "q2", "v2");
+
+    Mutation m3 = new ServerMutation(new Text("s"));
+    m3.put("f1", "q1", "v3");
+
+    Mutation m4 = new ServerMutation(new Text("s"));
+    m4.put("f1", "q2", "v4");
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 7, 10, e1), 
createKeyValue(DEFINE_TABLET, 5, 11, e2),
+        createKeyValue(MUTATION, 8, 10, m1), createKeyValue(COMPACTION_START, 
9, 10, "/t/f1"),
+        createKeyValue(MUTATION, 10, 10, m2), 
createKeyValue(COMPACTION_FINISH, 11, 10, null),
+        createKeyValue(MUTATION, 6, 11, m3), createKeyValue(COMPACTION_START, 
7, 11, "/t/f2"),
+        createKeyValue(MUTATION, 8, 11, m4)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    List<Mutation> mutations1 = recover(logs, e1);
+    Assert.assertEquals(1, mutations1.size());
+    Assert.assertEquals(m2, mutations1.get(0));
+
+    List<Mutation> mutations2 = recover(logs, e2);
+    Assert.assertEquals(2, mutations2.size());
+    Assert.assertEquals(m3, mutations2.get(0));
+    Assert.assertEquals(m4, mutations2.get(1));
+
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 9, 11, e2), 
createKeyValue(COMPACTION_FINISH, 10, 11, null)};
+    Arrays.sort(entries2);
+    logs.put("entries2", entries2);
+
+    mutations2 = recover(logs, e2);
+    Assert.assertEquals(1, mutations2.size());
+    Assert.assertEquals(m4, mutations2.get(0));
+  }
+
   private void runPathTest(boolean startMatches, String compactionStartFile, 
String... tabletFiles)
       throws IOException {
     Mutation m1 = new ServerMutation(new Text("row1"));
@@ -733,4 +817,179 @@ public void testPaths() throws IOException {
       }
     }
   }
+
+  @Test
+  public void testOnlyCompactionFinishEvent() throws IOException {
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    // The presence of only a compaction finish event indicates the write 
ahead logs are incomplete
+    // in some way. This should cause an exception.
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1),
+        createKeyValue(COMPACTION_FINISH, 102, 10, null), 
createKeyValue(MUTATION, 105, 10, m2)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        LogEvents.COMPACTION_FINISH.name() + " (without preceding " + 
LogEvents.COMPACTION_START);
+    recover(logs, extent);
+  }
+
+  @Test
+  public void testConsecutiveCompactionFinishEvents() throws IOException {
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    // Consecutive compaction finish events indicate the write ahead logs are 
incomplete in some
+    // way. This should cause an exception.
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1),
+        createKeyValue(COMPACTION_START, 102, 10, "/t/f1"),
+        createKeyValue(COMPACTION_FINISH, 103, 10, null),
+        createKeyValue(COMPACTION_FINISH, 109, 10, null), 
createKeyValue(MUTATION, 105, 10, m2)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("consecutive " + LogEvents.COMPACTION_FINISH.name());
+    recover(logs, extent);
+  }
+
+  @Test
+  public void testDuplicateCompactionFinishEvents() throws IOException {
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    // Duplicate consecutive compaction finish events should not cause an 
exception.
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1),
+        createKeyValue(COMPACTION_START, 102, 10, "/t/f1"),
+        createKeyValue(COMPACTION_FINISH, 104, 10, null),
+        createKeyValue(COMPACTION_FINISH, 104, 10, null), 
createKeyValue(MUTATION, 103, 10, m2)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    List<Mutation> mutations1 = recover(logs, extent);
+    Assert.assertEquals(1, mutations1.size());
+    Assert.assertEquals(m2, mutations1.get(0));
+  }
+
+  @Test
+  public void testEmptyLogFiles() throws IOException {
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1)};
+
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1")};
+
+    KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 105, 10, extent),
+        createKeyValue(COMPACTION_START, 107, 10, "/t/f1")};
+
+    KeyValue entries4[] = new KeyValue[] {};
+
+    KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 107, 10, extent),
+        createKeyValue(COMPACTION_FINISH, 111, 10, null)};
+
+    KeyValue entries6[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 122, 10, extent), 
createKeyValue(MUTATION, 123, 10, m2)};
+
+    Arrays.sort(entries1);
+    Arrays.sort(entries2);
+    Arrays.sort(entries3);
+    Arrays.sort(entries4);
+    Arrays.sort(entries5);
+    Arrays.sort(entries6);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+
+    logs.put("entries1", entries1);
+
+    List<Mutation> mutations = recover(logs, extent);
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m1, mutations.get(0));
+
+    logs.put("entries2", entries2);
+
+    mutations = recover(logs, extent);
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m1, mutations.get(0));
+
+    logs.put("entries3", entries3);
+
+    mutations = recover(logs, extent);
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m1, mutations.get(0));
+
+    logs.put("entries4", entries4);
+
+    mutations = recover(logs, extent);
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m1, mutations.get(0));
+
+    logs.put("entries5", entries5);
+
+    mutations = recover(logs, extent);
+    Assert.assertEquals(0, mutations.size());
+
+    logs.put("entries6", entries6);
+
+    mutations = recover(logs, extent);
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m2, mutations.get(0));
+  }
+
+  @Test
+  public void testFileWithoutOpen() throws IOException {
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    // Its expected that every log file should have an open event as the first 
event. Should throw
+    // an error if not present.
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(DEFINE_TABLET, 100, 
10, extent),
+        createKeyValue(MUTATION, 100, 10, m1), 
createKeyValue(COMPACTION_FINISH, 102, 10, null),
+        createKeyValue(MUTATION, 105, 10, m2)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("not " + LogEvents.OPEN);
+    recover(logs, extent);
+  }
 }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
index 19fb3c923a..5e26a9d653 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
@@ -46,7 +46,7 @@ static private void readWrite(LogEvents event, long seq, int 
tid, String filenam
     LogFileKey key = new LogFileKey();
     key.event = event;
     key.seq = seq;
-    key.tid = tid;
+    key.tabletId = tid;
     key.filename = filename;
     key.tablet = tablet;
     key.tserverSession = keyResult.tserverSession;
@@ -75,24 +75,24 @@ public void testReadFields() throws IOException {
     readWrite(COMPACTION_FINISH, 1, 2, null, null, null, key, value);
     assertEquals(key.event, COMPACTION_FINISH);
     assertEquals(key.seq, 1);
-    assertEquals(key.tid, 2);
+    assertEquals(key.tabletId, 2);
     readWrite(COMPACTION_START, 3, 4, "some file", null, null, key, value);
     assertEquals(key.event, COMPACTION_START);
     assertEquals(key.seq, 3);
-    assertEquals(key.tid, 4);
+    assertEquals(key.tabletId, 4);
     assertEquals(key.filename, "some file");
     KeyExtent tablet = new KeyExtent("table", new Text("bbbb"), new 
Text("aaaa"));
     readWrite(DEFINE_TABLET, 5, 6, null, tablet, null, key, value);
     assertEquals(key.event, DEFINE_TABLET);
     assertEquals(key.seq, 5);
-    assertEquals(key.tid, 6);
+    assertEquals(key.tabletId, 6);
     assertEquals(key.tablet, tablet);
     Mutation m = new ServerMutation(new Text("row"));
     m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
     readWrite(MUTATION, 7, 8, null, null, new Mutation[] {m}, key, value);
     assertEquals(key.event, MUTATION);
     assertEquals(key.seq, 7);
-    assertEquals(key.tid, 8);
+    assertEquals(key.tabletId, 8);
     assertEquals(value.mutations, Arrays.asList(m));
     m = new ServerMutation(new Text("row"));
     m.put(new Text("cf"), new Text("cq"), new ColumnVisibility("vis"), 12345,
@@ -103,12 +103,12 @@ public void testReadFields() throws IOException {
     readWrite(MUTATION, 8, 9, null, null, new Mutation[] {m}, key, value);
     assertEquals(key.event, MUTATION);
     assertEquals(key.seq, 8);
-    assertEquals(key.tid, 9);
+    assertEquals(key.tabletId, 9);
     assertEquals(value.mutations, Arrays.asList(m));
     readWrite(MANY_MUTATIONS, 9, 10, null, null, new Mutation[] {m, m}, key, 
value);
     assertEquals(key.event, MANY_MUTATIONS);
     assertEquals(key.seq, 9);
-    assertEquals(key.tid, 10);
+    assertEquals(key.tabletId, 10);
     assertEquals(value.mutations, Arrays.asList(m, m));
   }
 
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 1ff9799d1a..107060bfa7 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -79,7 +79,7 @@ public void 
onlyChooseMutationsForDesiredTableWithOpenStatus() throws Exception
      */
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("1", null, null);
-    key.tid = 1;
+    key.tabletId = 1;
 
     key.write(dos);
     value.write(dos);
@@ -94,14 +94,14 @@ public void 
onlyChooseMutationsForDesiredTableWithOpenStatus() throws Exception
 
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("2", null, null);
-    key.tid = 2;
+    key.tabletId = 2;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
     value.write(dos);
 
     key.event = LogEvents.OPEN;
-    key.tid = LogFileKey.VERSION;
+    key.tabletId = LogFileKey.VERSION;
     key.tserverSession = "foobar";
 
     key.write(dos);
@@ -116,7 +116,7 @@ public void 
onlyChooseMutationsForDesiredTableWithOpenStatus() throws Exception
     value.write(dos);
 
     key.event = LogEvents.COMPACTION_START;
-    key.tid = 2;
+    key.tabletId = 2;
     key.filename = "/accumulo/tables/1/t-000001/A000001.rf";
     value.mutations = Collections.emptyList();
 
@@ -125,14 +125,14 @@ public void 
onlyChooseMutationsForDesiredTableWithOpenStatus() throws Exception
 
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("1", null, null);
-    key.tid = 3;
+    key.tabletId = 3;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
     value.write(dos);
 
     key.event = LogEvents.COMPACTION_FINISH;
-    key.tid = 6;
+    key.tabletId = 6;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
@@ -140,7 +140,7 @@ public void 
onlyChooseMutationsForDesiredTableWithOpenStatus() throws Exception
 
     key.tablet = null;
     key.event = LogEvents.MUTATION;
-    key.tid = 3;
+    key.tabletId = 3;
     key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     value.mutations = Arrays.<Mutation> asList(new ServerMutation(new 
Text("row")));
 
@@ -188,7 +188,7 @@ public void 
onlyChooseMutationsForDesiredTableWithClosedStatus() throws Exceptio
      */
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("1", null, null);
-    key.tid = 1;
+    key.tabletId = 1;
 
     key.write(dos);
     value.write(dos);
@@ -203,14 +203,14 @@ public void 
onlyChooseMutationsForDesiredTableWithClosedStatus() throws Exceptio
 
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("2", null, null);
-    key.tid = 2;
+    key.tabletId = 2;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
     value.write(dos);
 
     key.event = LogEvents.OPEN;
-    key.tid = LogFileKey.VERSION;
+    key.tabletId = LogFileKey.VERSION;
     key.tserverSession = "foobar";
 
     key.write(dos);
@@ -225,7 +225,7 @@ public void 
onlyChooseMutationsForDesiredTableWithClosedStatus() throws Exceptio
     value.write(dos);
 
     key.event = LogEvents.COMPACTION_START;
-    key.tid = 2;
+    key.tabletId = 2;
     key.filename = "/accumulo/tables/1/t-000001/A000001.rf";
     value.mutations = Collections.emptyList();
 
@@ -234,14 +234,14 @@ public void 
onlyChooseMutationsForDesiredTableWithClosedStatus() throws Exceptio
 
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("1", null, null);
-    key.tid = 3;
+    key.tabletId = 3;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
     value.write(dos);
 
     key.event = LogEvents.COMPACTION_FINISH;
-    key.tid = 6;
+    key.tabletId = 6;
     value.mutations = Collections.emptyList();
 
     key.write(dos);
@@ -249,7 +249,7 @@ public void 
onlyChooseMutationsForDesiredTableWithClosedStatus() throws Exceptio
 
     key.tablet = null;
     key.event = LogEvents.MUTATION;
-    key.tid = 3;
+    key.tabletId = 3;
     key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     value.mutations = Arrays.<Mutation> asList(new ServerMutation(new 
Text("row")));
 
@@ -396,7 +396,7 @@ public void restartInFileKnowsAboutPreviousTableDefines() 
throws Exception {
      */
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent("1", null, null);
-    key.tid = 1;
+    key.tabletId = 1;
 
     key.write(dos);
     value.write(dos);
@@ -411,7 +411,7 @@ public void restartInFileKnowsAboutPreviousTableDefines() 
throws Exception {
 
     key.tablet = null;
     key.event = LogEvents.MUTATION;
-    key.tid = 1;
+    key.tabletId = 1;
     key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     value.mutations = Arrays.<Mutation> asList(new ServerMutation(new 
Text("row")));
 
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
index 6d2c66a3a1..67524a8241 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
@@ -83,7 +83,7 @@ public void systemTimestampsAreSetOnUpdates() throws 
Exception {
     LogFileKey key = new LogFileKey();
     key.event = LogEvents.MANY_MUTATIONS;
     key.seq = 1;
-    key.tid = 1;
+    key.tabletId = 1;
 
     WalEdits edits = new WalEdits();
 
@@ -149,7 +149,7 @@ public void replicationSourcesArePreserved() throws 
Exception {
     LogFileKey key = new LogFileKey();
     key.event = LogEvents.MANY_MUTATIONS;
     key.seq = 1;
-    key.tid = 1;
+    key.tabletId = 1;
 
     WalEdits edits = new WalEdits();
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
 
b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
index fc47843b10..bdde3f7486 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
@@ -118,7 +118,7 @@ public void test() throws Exception {
     key.event = LogEvents.DEFINE_TABLET;
     key.tablet = new KeyExtent(Integer.toString(fakeTableId), null, null);
     key.seq = 1l;
-    key.tid = 1;
+    key.tabletId = 1;
 
     key.write(dos);
     value.write(dos);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to