This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 80c2e9db8ea6d23e542ffd47c03647441f5e3898
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Apr 27 18:16:51 2018 -0400

    fixes #449 fix two bugs with WAL recovery (#458)
    
     * Fix bug where tablet is unloaded, reloaded on tserver, and then tserver 
dies
     * Fix bug with out of order logs.  Recovery code assumed logs were passed 
in
       time order.  However, since 1.8.0 they have been passed in random order. 
Rewrote
       recovery code to handle out of order logs.  The fix was to read all logs 
in
       a sorted merged way.
---
 .../accumulo/tserver/log/RecoveryLogsIterator.java | 241 ++++++++++++++
 .../accumulo/tserver/log/SortedLogRecovery.java    | 356 ++++++++++-----------
 .../tserver/log/SortedLogRecoveryTest.java         | 116 ++++++-
 3 files changed, 506 insertions(+), 207 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
new file mode 100644
index 0000000..ba25322
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
+import org.mortbay.log.Log;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * Iterates over multiple recovery logs merging them into a single sorted 
stream.
+ */
+public class RecoveryLogsIterator
+    implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
+
+  private List<MultiReader> readers;
+  private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+
+  private static class MultiReaderIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private MultiReader reader;
+    private LogFileKey key = new LogFileKey();
+    private LogFileValue value = new LogFileValue();
+    private boolean hasNext;
+    private LogFileKey end;
+
+    MultiReaderIterator(MultiReader reader, LogFileKey start, LogFileKey end) 
throws IOException {
+      this.reader = reader;
+      this.end = end;
+
+      reader.seek(start);
+
+      hasNext = reader.next(key, value);
+
+      if (hasNext && key.compareTo(start) < 0) {
+        throw new IllegalStateException("First key is less than start " + key 
+ " " + start);
+      }
+
+      if (hasNext && key.compareTo(end) > 0) {
+        hasNext = false;
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Preconditions.checkState(hasNext);
+      Entry<LogFileKey,LogFileValue> entry = new 
AbstractMap.SimpleImmutableEntry<>(key, value);
+
+      key = new LogFileKey();
+      value = new LogFileValue();
+      try {
+        hasNext = reader.next(key, value);
+        if (hasNext && key.compareTo(end) > 0) {
+          hasNext = false;
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+
+      return entry;
+    }
+  }
+
+  static class SortCheckIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
+    private String sourceName;
+
+    SortCheckIterator(String sourceName, 
Iterator<Entry<LogFileKey,LogFileValue>> source) {
+      this.source = Iterators.peekingIterator(source);
+      this.sourceName = sourceName;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return source.hasNext();
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Entry<LogFileKey,LogFileValue> next = source.next();
+      if (source.hasNext()) {
+        
Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
+            "Data source %s keys not in order %s %s", sourceName, 
next.getKey(),
+            source.peek().getKey());
+      }
+      return next;
+    }
+  }
+
+  private MultiReader open(VolumeManager fs, Path log) throws IOException {
+    MultiReader reader = new MultiReader(fs, log);
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+    if (!reader.next(key, value)) {
+      reader.close();
+      return null;
+    }
+    if (key.event != OPEN) {
+      reader.close();
+      throw new RuntimeException("First log entry value is not OPEN");
+    }
+
+    return reader;
+  }
+
+  static LogFileKey maxKey(LogEvents event) {
+    LogFileKey key = new LogFileKey();
+    key.event = event;
+    key.tid = Integer.MAX_VALUE;
+    key.seq = Long.MAX_VALUE;
+    return key;
+  }
+
+  static LogFileKey maxKey(LogEvents event, int tid) {
+    LogFileKey key = maxKey(event);
+    key.tid = tid;
+    return key;
+  }
+
+  static LogFileKey minKey(LogEvents event) {
+    LogFileKey key = new LogFileKey();
+    key.event = event;
+    key.tid = 0;
+    key.seq = 0;
+    return key;
+  }
+
+  static LogFileKey minKey(LogEvents event, int tid) {
+    LogFileKey key = minKey(event);
+    key.tid = tid;
+    return key;
+  }
+
+  /**
+   * Iterates only over keys with the specified event (some events are 
equivalent for sorting) and
+   * tid type.
+   */
+  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, 
LogEvents event, int tid)
+      throws IOException {
+    this(fs, recoveryLogPaths, minKey(event, tid), maxKey(event, tid));
+  }
+
+  /**
+   * Iterates only over keys with the specified event (some events are 
equivalent for sorting).
+   */
+  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, 
LogEvents event)
+      throws IOException {
+    this(fs, recoveryLogPaths, minKey(event), maxKey(event));
+  }
+
+  /**
+   * Iterates only over keys between [start,end].
+   */
+  RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, 
LogFileKey start,
+      LogFileKey end) throws IOException {
+    readers = new ArrayList<>(recoveryLogPaths.size());
+
+    ArrayList<Iterator<Entry<LogFileKey,LogFileValue>>> iterators = new 
ArrayList<>();
+
+    try {
+      for (Path log : recoveryLogPaths) {
+        MultiReader reader = open(fs, log);
+        if (reader != null) {
+          readers.add(reader);
+          iterators.add(
+              new SortCheckIterator(log.getName(), new 
MultiReaderIterator(reader, start, end)));
+        }
+      }
+
+      iter = Iterators.mergeSorted(iterators, new 
Comparator<Entry<LogFileKey,LogFileValue>>() {
+        @Override
+        public int compare(Entry<LogFileKey,LogFileValue> o1, 
Entry<LogFileKey,LogFileValue> o2) {
+          return o1.getKey().compareTo(o2.getKey());
+        }
+      });
+
+    } catch (RuntimeException | IOException e) {
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iter.hasNext();
+  }
+
+  @Override
+  public Entry<LogFileKey,LogFileValue> next() {
+    return iter.next();
+  }
+
+  @Override
+  public void close() {
+    for (MultiReader reader : readers) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        Log.debug("Failed to close reader", e);
+      }
+    }
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index a850d2f..2c2a463 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -16,50 +16,46 @@
  */
 package org.apache.accumulo.tserver.log;
 
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.accumulo.tserver.log.RecoveryLogsIterator.maxKey;
+import static org.apache.accumulo.tserver.log.RecoveryLogsIterator.minKey;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
-import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
 /**
  * Extract Mutations for a tablet from a set of logs that have been sorted by 
operation and tablet.
  *
  */
 public class SortedLogRecovery {
-  private static final Logger log = 
LoggerFactory.getLogger(SortedLogRecovery.class);
-
-  static class EmptyMapFileException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EmptyMapFileException() {
-      super();
-    }
-  }
 
-  static class UnusedException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public UnusedException() {
-      super();
-    }
-  }
+  private static final Logger log = 
LoggerFactory.getLogger(SortedLogRecovery.class);
 
   private VolumeManager fs;
 
@@ -67,81 +63,32 @@ public class SortedLogRecovery {
     this.fs = fs;
   }
 
-  private enum Status {
-    INITIAL, LOOKING_FOR_FINISH, COMPLETE
-  }
-
-  private static class LastStartToFinish {
-    long lastStart = -1;
-    long seq = -1;
-    long lastFinish = -1;
-    Status compactionStatus = Status.INITIAL;
-    String tserverSession = "";
-
-    private void update(long newFinish) {
-      this.seq = this.lastStart;
-      if (newFinish != -1)
-        lastFinish = newFinish;
-    }
+  private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) 
throws IOException {
+    int tid = -1;
 
-    private void update(int newStartFile, long newStart) {
-      this.lastStart = newStart;
-    }
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, 
DEFINE_TABLET)) {
+      KeyExtent alternative = extent;
+      if (extent.isRootTablet()) {
+        alternative = RootTable.OLD_EXTENT;
+      }
 
-    private void update(String newSession) {
-      this.lastStart = -1;
-      this.lastFinish = -1;
-      this.compactionStatus = Status.INITIAL;
-      this.tserverSession = newSession;
-    }
-  }
+      while (rli.hasNext()) {
+        LogFileKey key = rli.next().getKey();
 
-  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> 
tabletFiles,
-      MutationReceiver mr) throws IOException {
-    int[] tids = new int[recoveryLogs.size()];
-    LastStartToFinish lastStartToFinish = new LastStartToFinish();
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        try {
-          tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, 
lastStartToFinish);
-        } catch (EmptyMapFileException ex) {
-          log.info("Ignoring empty map file " + logfile);
-          tids[i] = -1;
-        } catch (UnusedException ex) {
-          log.info("Ignoring log file " + logfile + " appears to be unused by 
" + extent);
-          tids[i] = -1;
-        }
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
-        }
-      }
+        checkState(key.event == DEFINE_TABLET); // should only fail if bug 
elsewhere
 
-    }
+        if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
+          checkState(key.tid >= 0, "Tid %s for %s is negative", key.tid, 
extent);
+          checkState(tid == -1 || key.tid >= tid); // should only fail if bug 
in
+                                                   // RecoveryLogsIterator
 
-    if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-      throw new RuntimeException("COMPACTION_FINISH (without preceding"
-          + " COMPACTION_START) not followed by successful minor compaction");
-
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        playbackMutations(reader, tids[i], lastStartToFinish, mr);
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
+          if (tid != key.tid) {
+            tid = key.tid;
+          }
         }
       }
-      log.info("Recovery complete for " + extent + " using " + logfile);
     }
+    return tid;
   }
 
   private String getPathSuffix(String pathString) {
@@ -151,126 +98,155 @@ public class SortedLogRecovery {
     return path.getParent().getName() + "/" + path.getName();
   }
 
-  int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent,
-      Set<String> tabletFiles, LastStartToFinish lastStartToFinish)
-      throws IOException, EmptyMapFileException, UnusedException {
+  static class DeduplicatingIterator implements 
Iterator<Entry<LogFileKey,LogFileValue>> {
+
+    private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
+
+    public DeduplicatingIterator(Iterator<Entry<LogFileKey,LogFileValue>> 
source) {
+      this.source = Iterators.peekingIterator(source);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return source.hasNext();
+    }
+
+    @Override
+    public Entry<LogFileKey,LogFileValue> next() {
+      Entry<LogFileKey,LogFileValue> next = source.next();
+
+      while (source.hasNext() && 
next.getKey().compareTo(source.peek().getKey()) == 0) {
+        source.next();
+      }
+
+      return next;
+    }
+
+  }
 
+  private long findLastStartToFinish(List<Path> recoveryLogs, Set<String> 
tabletFiles, int tid)
+      throws IOException {
     HashSet<String> suffixes = new HashSet<>();
     for (String path : tabletFiles)
       suffixes.add(getPathSuffix(path));
 
-    // Scan for tableId for this extent (should always be in the log)
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-    int tid = -1;
-    if (!reader.next(key, value))
-      throw new EmptyMapFileException();
-    if (key.event != OPEN)
-      throw new RuntimeException("First log entry value is not OPEN");
-
-    if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) {
-      if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-        throw new RuntimeException("COMPACTION_FINISH (without preceding"
-            + " COMPACTION_START) is not followed by a successful minor 
compaction.");
-      lastStartToFinish.update(key.tserverSession);
-    }
-    KeyExtent alternative = extent;
-    if (extent.isRootTablet()) {
-      alternative = RootTable.OLD_EXTENT;
+    long lastStart = 0;
+    long recoverySeq = 0;
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, 
COMPACTION_START,
+        tid)) {
+
+      DeduplicatingIterator ddi = new DeduplicatingIterator(rli);
+
+      String lastStartFile = null;
+      LogEvents lastEvent = null;
+      boolean firstEventWasFinish = false;
+      boolean sawStartFinish = false;
+
+      while (ddi.hasNext()) {
+        LogFileKey key = ddi.next().getKey();
+
+        checkState(key.seq >= 0, "Unexpected negative seq %s for tid %s", 
key.seq, tid);
+        checkState(key.tid == tid); // should only fail if bug elsewhere
+
+        if (key.event == COMPACTION_START) {
+          checkState(key.seq >= lastStart); // should only fail if bug 
elsewhere
+          lastStart = key.seq;
+          lastStartFile = key.filename;
+        } else if (key.event == COMPACTION_FINISH) {
+          if (lastEvent == null) {
+            firstEventWasFinish = true;
+          } else if (lastEvent == COMPACTION_FINISH) {
+            throw new IllegalStateException(
+                "Saw consecutive COMPACTION_FINISH events " + key.tid + " " + 
key.seq);
+          } else {
+            if (key.seq <= lastStart) {
+              throw new IllegalStateException(
+                  "Compaction finish <= start " + lastStart + " " + key.seq);
+            }
+            recoverySeq = lastStart;
+            lastStartFile = null;
+            sawStartFinish = true;
+          }
+        } else {
+          throw new IllegalStateException("Non compaction event seen " + 
key.event);
+        }
+
+        lastEvent = key.event;
+      }
+
+      if (firstEventWasFinish && !sawStartFinish) {
+        throw new IllegalStateException(
+            "COMPACTION_FINISH (without preceding COMPACTION_START) is not 
followed by a successful minor compaction.");
+      }
+
+      if (lastStartFile != null && 
suffixes.contains(getPathSuffix(lastStartFile))) {
+        // There was no compaction finish event, however the last compaction 
start event has a file
+        // in the metadata table, so the compaction finished.
+        log.debug("Considering compaction start {} {} finished because file {} 
in metadata table",
+            tid, lastStart, getPathSuffix(lastStartFile));
+        recoverySeq = lastStart;
+      }
     }
+    return recoverySeq;
+  }
+
+  private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, 
int tid,
+      long recoverySeq) throws IOException {
+    LogFileKey start = minKey(MUTATION, tid);
+    start.seq = recoverySeq;
 
-    LogFileKey defineKey = null;
-
-    // find the maximum tablet id... because a tablet may leave a tserver and 
then come back, in
-    // which case it would have a different tablet id
-    // for the maximum tablet id, find the minimum sequence #... may be ok to 
find the max seq, but
-    // just want to make the code behave like it used to
-    while (reader.next(key, value)) {
-      // log.debug("Event " + key.event + " tablet " + key.tablet);
-      if (key.event != DEFINE_TABLET)
-        break;
-      if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
-        if (tid != key.tid) {
-          tid = key.tid;
-          defineKey = key;
-          key = new LogFileKey();
+    LogFileKey end = maxKey(MUTATION, tid);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, 
start, end)) {
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+
+        checkState(entry.getKey().tid == tid); // should only fail if bug 
elsewhere
+        checkState(entry.getKey().seq >= recoverySeq); // should only fail if 
bug elsewhere
+
+        if (entry.getKey().event == MUTATION) {
+          mr.receive(entry.getValue().mutations.get(0));
+        } else if (entry.getKey().event == MANY_MUTATIONS) {
+          for (Mutation m : entry.getValue().mutations) {
+            mr.receive(m);
+          }
+        } else {
+          throw new IllegalStateException("Non mutation event seen " + 
entry.getKey().event);
         }
       }
     }
-    if (tid < 0) {
-      throw new UnusedException();
-    }
+  }
 
-    log.debug("Found tid, seq " + tid + " " + defineKey.seq);
-
-    // Scan start/stop events for this tablet
-    key = defineKey;
-    key.event = COMPACTION_START;
-    reader.seek(key);
-    while (reader.next(key, value)) {
-      // LogFileEntry.printEntry(entry);
-      if (key.tid != tid)
-        break;
-      if (key.event == COMPACTION_START) {
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for 
start/stop events: "
-              + key.seq + " vs " + lastStartToFinish.lastStart);
-        lastStartToFinish.update(fileno, key.seq);
-
-        // Tablet server finished the minor compaction, but didn't remove the 
entry from the
-        // METADATA table.
-        log.debug(
-            "minor compaction into " + key.filename + " finished, but was 
still in the METADATA");
-        if (suffixes.contains(getPathSuffix(key.filename)))
-          lastStartToFinish.update(-1);
-      } else if (key.event == COMPACTION_FINISH) {
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for 
start/stop events: "
-              + key.seq + " vs " + lastStartToFinish.lastStart);
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
-        else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
-          throw new RuntimeException(
-              "COMPACTION_FINISH does not have preceding COMPACTION_START 
event.");
-        else
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        lastStartToFinish.update(key.seq);
-      } else
-        break;
-    }
-    return tid;
+  Collection<String> asNames(List<Path> recoveryLogs) {
+    return Collections2.transform(recoveryLogs, new Function<Path,String>() {
+      @Override
+      public String apply(Path input) {
+        return input.getName();
+      }
+    });
   }
 
-  private void playbackMutations(MultiReader reader, int tid, 
LastStartToFinish lastStartToFinish,
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> 
tabletFiles,
       MutationReceiver mr) throws IOException {
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-
-    // Playback mutations after the last stop to finish
-    log.info("Scanning for mutations starting at sequence number " + 
lastStartToFinish.seq
-        + " for tid " + tid);
-    key.event = MUTATION;
-    key.tid = tid;
-    // the seq number for the minor compaction start is now the same as the
-    // last update made to memory. Scan up to that mutation, but not past it.
-    key.seq = lastStartToFinish.seq;
-    reader.seek(key);
-    while (true) {
-      if (!reader.next(key, value))
-        break;
-      if (key.tid != tid)
-        break;
-      if (key.event == MUTATION) {
-        mr.receive(value.mutations.get(0));
-      } else if (key.event == MANY_MUTATIONS) {
-        for (Mutation m : value.mutations) {
-          mr.receive(m);
-        }
-      } else {
-        throw new RuntimeException("unexpected log key type: " + key.event);
-      }
+
+    // A tablet may leave a tserver and then come back, in which case it would 
have a different and
+    // higher tablet id. Only want to consider events in the log related to 
the last time the tablet
+    // was loaded.
+    int tid = findMaxTabletId(extent, recoveryLogs);
+
+    if (tid == -1) {
+      log.info("Tablet {} is not defined in recovery logs {} ", extent, 
asNames(recoveryLogs));
+      return;
     }
+
+    // Find the seq # for the last compaction that started and finished
+    long recoverySeq = findLastStartToFinish(recoveryLogs, tabletFiles, tid);
+
+    log.info("Recovering mutations, tablet:{} tid:{} seq:{} logs:{}", extent, 
tid, recoverySeq,
+        asNames(recoveryLogs));
+
+    // Replay all mutations that were written after the last successful 
compaction started.
+    playbackMutations(recoveryLogs, mr, tid, recoverySeq);
   }
 }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 6053cb5..dc48d62 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -179,17 +179,17 @@ public class SortedLogRecoveryTest {
         createKeyValue(DEFINE_TABLET, 1, 1, extent),
         createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"), 
createKeyValue(MUTATION, 7, 1, m),};
     KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, 2, "23"),
-        createKeyValue(DEFINE_TABLET, 1, 2, extent),
-        createKeyValue(COMPACTION_START, 5, 2, "/t1/f2"),
-        createKeyValue(COMPACTION_FINISH, 6, 2, null), 
createKeyValue(MUTATION, 3, 2, ignored),
-        createKeyValue(MUTATION, 4, 2, ignored),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 5, 1, "/t1/f2"),
+        createKeyValue(COMPACTION_FINISH, 6, 1, null), 
createKeyValue(MUTATION, 3, 1, ignored),
+        createKeyValue(MUTATION, 4, 1, ignored),};
     KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 0, 3, "69"),
-        createKeyValue(DEFINE_TABLET, 1, 3, extent), createKeyValue(MUTATION, 
2, 3, ignored),
-        createKeyValue(MUTATION, 3, 3, ignored), createKeyValue(MUTATION, 4, 
3, ignored),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 
2, 1, ignored),
+        createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 4, 
1, ignored),};
     KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, 4, "70"),
-        createKeyValue(DEFINE_TABLET, 1, 4, extent),
-        createKeyValue(COMPACTION_START, 3, 4, "/t1/f3"), 
createKeyValue(MUTATION, 2, 4, ignored),
-        createKeyValue(MUTATION, 6, 4, m2),};
+        createKeyValue(DEFINE_TABLET, 1, 1, extent),
+        createKeyValue(COMPACTION_START, 3, 1, "/t1/f3"), 
createKeyValue(MUTATION, 2, 1, ignored),
+        createKeyValue(MUTATION, 6, 1, m2),};
 
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
@@ -203,8 +203,8 @@ public class SortedLogRecoveryTest {
 
     // Verify recovered data
     Assert.assertEquals(2, mutations.size());
-    Assert.assertEquals(m, mutations.get(0));
-    Assert.assertEquals(m2, mutations.get(1));
+    Assert.assertTrue(mutations.contains(m));
+    Assert.assertTrue(mutations.contains(m2));
   }
 
   @Test
@@ -545,7 +545,7 @@ public class SortedLogRecoveryTest {
         // createKeyValue(COMPACTION_FINISH, 17, 1, null),
         // createKeyValue(COMPACTION_START, 18, 1, "somefile"),
         // createKeyValue(COMPACTION_FINISH, 19, 1, null),
-        createKeyValue(MUTATION, 8, 1, m5), createKeyValue(MUTATION, 20, 1, 
m6),};
+        createKeyValue(MUTATION, 9, 1, m5), createKeyValue(MUTATION, 20, 1, 
m6),};
     Map<String,KeyValue[]> logs = new TreeMap<>();
     logs.put("entries", entries);
     logs.put("entries2", entries2);
@@ -595,11 +595,11 @@ public class SortedLogRecoveryTest {
     List<Mutation> mutations = recover(logs, extent);
     // Verify recovered data
     Assert.assertEquals(5, mutations.size());
-    Assert.assertEquals(m, mutations.get(0));
-    Assert.assertEquals(m2, mutations.get(1));
-    Assert.assertEquals(m3, mutations.get(2));
-    Assert.assertEquals(m4, mutations.get(3));
-    Assert.assertEquals(m5, mutations.get(4));
+    Assert.assertTrue(mutations.contains(m));
+    Assert.assertTrue(mutations.contains(m2));
+    Assert.assertTrue(mutations.contains(m3));
+    Assert.assertTrue(mutations.contains(m4));
+    Assert.assertTrue(mutations.contains(m5));
   }
 
   @Test
@@ -674,6 +674,84 @@ public class SortedLogRecoveryTest {
     Assert.assertEquals(m, mutations.get(0));
   }
 
+  @Test
+  public void testLeaveAndComeBack() throws IOException {
+    // TODO document scenario
+    Mutation m1 = new ServerMutation(new Text("r1"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("r2"));
+    m2.put("f1", "q1", "v2");
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 100, 10, extent), 
createKeyValue(MUTATION, 100, 10, m1),
+        createKeyValue(COMPACTION_START, 101, 10, "/t/f1"),
+        createKeyValue(COMPACTION_FINISH, 102, 10, null)};
+
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 1, 20, extent), createKeyValue(MUTATION, 
1, 20, m2)};
+
+    Arrays.sort(entries1);
+    Arrays.sort(entries2);
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+    logs.put("entries2", entries2);
+
+    List<Mutation> mutations = recover(logs, extent);
+
+    Assert.assertEquals(1, mutations.size());
+    Assert.assertEquals(m2, mutations.get(0));
+  }
+
+  @Test
+  public void testMultipleTablets() throws IOException {
+    // TODO document scenario
+    KeyExtent e1 = new KeyExtent("1", new Text("m"), null);
+    KeyExtent e2 = new KeyExtent("1", null, new Text("m"));
+
+    Mutation m1 = new ServerMutation(new Text("b"));
+    m1.put("f1", "q1", "v1");
+
+    Mutation m2 = new ServerMutation(new Text("b"));
+    m2.put("f1", "q2", "v2");
+
+    Mutation m3 = new ServerMutation(new Text("s"));
+    m3.put("f1", "q1", "v3");
+
+    Mutation m4 = new ServerMutation(new Text("s"));
+    m4.put("f1", "q2", "v4");
+
+    KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 7, 10, e1), 
createKeyValue(DEFINE_TABLET, 5, 11, e2),
+        createKeyValue(MUTATION, 8, 10, m1), createKeyValue(COMPACTION_START, 
9, 10, "/t/f1"),
+        createKeyValue(MUTATION, 10, 10, m2), 
createKeyValue(COMPACTION_FINISH, 11, 10, null),
+        createKeyValue(MUTATION, 6, 11, m3), createKeyValue(COMPACTION_START, 
7, 11, "/t/f2"),
+        createKeyValue(MUTATION, 8, 11, m4)};
+
+    Arrays.sort(entries1);
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("entries1", entries1);
+
+    List<Mutation> mutations1 = recover(logs, e1);
+    Assert.assertEquals(1, mutations1.size());
+    Assert.assertEquals(m2, mutations1.get(0));
+
+    List<Mutation> mutations2 = recover(logs, e2);
+    Assert.assertEquals(2, mutations2.size());
+    Assert.assertEquals(m3, mutations2.get(0));
+    Assert.assertEquals(m4, mutations2.get(1));
+
+    KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+        createKeyValue(DEFINE_TABLET, 9, 11, e2), 
createKeyValue(COMPACTION_FINISH, 10, 11, null)};
+    Arrays.sort(entries2);
+    logs.put("entries2", entries2);
+
+    mutations2 = recover(logs, e2);
+    Assert.assertEquals(1, mutations2.size());
+    Assert.assertEquals(m4, mutations2.get(0));
+  }
+
   private void runPathTest(boolean startMatches, String compactionStartFile, 
String... tabletFiles)
       throws IOException {
     Mutation m1 = new ServerMutation(new Text("row1"));
@@ -733,4 +811,8 @@ public class SortedLogRecoveryTest {
       }
     }
   }
+
+  // TODO test only logs with only a compaction finish event
+  // TODO test logs with consecutive compaction finish events
+  // TODO test logs with consecutive duplicate compaction finish events
 }

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to