This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3e765b1 Make sorted recovery write to RFiles (#2117) 3e765b1 is described below commit 3e765b1f3b487092206cee036097820c37b7907e Author: Mike Miller <mmil...@apache.org> AuthorDate: Mon Jun 21 09:53:33 2021 -0400 Make sorted recovery write to RFiles (#2117) * Change LogSorter and SortedLogRecovery to write/read RFile instead of MapFile * Rewrite LogSorter writeBuffer method and make static to test * Create toKey() and fromKey() methods in LogFileKey for converting WAL keys * Create toValue() and FromValue() methods in LogFileValue for converting WAL values * Byte encode the rows of the Key to sort WAL events properly * Make RecoveryLogsIterator convert Key Value pairs on next() Co-authored-by: Keith Turner <ktur...@apache.org> Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../org/apache/accumulo/tserver/TabletServer.java | 6 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 64 +++++--- .../accumulo/tserver/log/RecoveryLogReader.java | 2 +- .../accumulo/tserver/log/RecoveryLogsIterator.java | 124 +++++++++++---- .../accumulo/tserver/log/SortedLogRecovery.java | 63 ++++---- .../accumulo/tserver/log/TabletServerLogger.java | 9 +- .../apache/accumulo/tserver/logger/LogFileKey.java | 167 +++++++++++++++++++++ .../accumulo/tserver/logger/LogFileValue.java | 28 ++++ .../tserver/log/SortedLogRecoveryTest.java | 79 +++++++--- .../tserver/log/TestUpgradePathForWALogs.java | 3 + 10 files changed, 436 insertions(+), 109 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 6887f4a..2ab89fa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1133,7 +1133,7 @@ public class TabletServer extends AbstractServer { public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException { - List<Path> recoveryLogs = new ArrayList<>(); + List<Path> recoveryDirs = new ArrayList<>(); List<LogEntry> sorted = new ArrayList<>(logEntries); sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); for (LogEntry entry : sorted) { @@ -1148,9 +1148,9 @@ public class TabletServer extends AbstractServer { throw new IOException( "Unable to find recovery files for extent " + extent + " logEntry: " + entry); } - recoveryLogs.add(recovery); + recoveryDirs.add(recovery); } - logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver); + logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver); } public int createLogId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 98fe935..293ff82 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -23,16 +23,20 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.master.thrift.RecoveryStatus; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -47,11 +51,12 @@ import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.MapFile; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + public class LogSorter { private static final Logger log = LoggerFactory.getLogger(LogSorter.class); @@ -141,7 +146,7 @@ public class LogSorter { // Creating a 'finished' marker will cause recovery to proceed normally and the // empty file will be correctly ignored downstream. fs.mkdirs(new Path(destPath)); - writeBuffer(destPath, Collections.emptyList(), part++); + writeBuffer(context, destPath, Collections.emptyList(), part++); fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); return; } @@ -159,10 +164,10 @@ public class LogSorter { value.readFields(decryptingInput); buffer.add(new Pair<>(key, value)); } - writeBuffer(destPath, buffer, part++); + writeBuffer(context, destPath, buffer, part++); buffer.clear(); } catch (EOFException ex) { - writeBuffer(destPath, buffer, part++); + writeBuffer(context, destPath, buffer, part++); break; } } @@ -171,21 +176,6 @@ public class LogSorter { getSortTime()); } - private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) - throws IOException { - Path path = new Path(destPath, String.format("part-r-%05d", part)); - FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); - - try (MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns.makeQualified(path), - MapFile.Writer.keyClass(LogFileKey.class), - MapFile.Writer.valueClass(LogFileValue.class))) { - buffer.sort(Comparator.comparing(Pair::getFirst)); - for (Pair<LogFileKey,LogFileValue> entry : buffer) { - output.append(entry.getFirst(), entry.getSecond()); - } - } - } - synchronized void close() throws IOException { // If we receive an empty or malformed-header WAL, we won't // have input streams that need closing. Avoid the NPE. @@ -224,6 +214,40 @@ public class LogSorter { this.walBlockSize = DfsLogger.getWalBlockSize(conf); } + @VisibleForTesting + public static void writeBuffer(ServerContext context, String destPath, + List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { + String filename = String.format("part-r-%05d.rf", part); + Path path = new Path(destPath, filename); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(path); + Path fullPath = fs.makeQualified(path); + + // convert the LogFileKeys to Keys, sort and collect the mutations + Map<Key,List<Mutation>> keyListMap = new TreeMap<>(); + for (Pair<LogFileKey,LogFileValue> pair : buffer) { + var logFileKey = pair.getFirst(); + var logFileValue = pair.getSecond(); + Key k = logFileKey.toKey(); + var list = keyListMap.putIfAbsent(k, logFileValue.mutations); + if (list != null) { + var muts = new ArrayList<>(list); + muts.addAll(logFileValue.mutations); + keyListMap.put(logFileKey.toKey(), muts); + } + } + + try (var writer = FileOperations.getInstance().newWriterBuilder() + .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService()) + .withTableConfiguration(DefaultConfiguration.getInstance()).build()) { + writer.startDefaultLocalityGroup(); + for (var entry : keyListMap.entrySet()) { + LogFileValue val = new LogFileValue(); + val.mutations = entry.getValue(); + writer.append(entry.getKey(), val.toValue()); + } + } + } + public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException { this.threadPool = distWorkQThreadPool; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java index 4afa85d..bde5a1c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java @@ -138,7 +138,7 @@ public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,Log } if (!foundFinish) throw new IOException( - "Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory); + "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory); iter = new SortCheckIterator(new RangeIterator(start, end)); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index d2128f5..0f5e259 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -19,61 +19,82 @@ package org.apache.accumulo.tserver.log; import java.io.IOException; +import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.tserver.logger.LogEvents; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; /** * Iterates over multiple sorted recovery logs merging them into a single sorted stream. */ -public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> { +public class RecoveryLogsIterator + implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class); - List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators; - private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter; + private final List<Scanner> scanners; + private final Iterator<Entry<Key,Value>> iter; /** - * Iterates only over keys in the range [start,end]. + * Scans the files in each recoveryLogDir over the range [start,end]. */ - RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start, - LogFileKey end) throws IOException { + RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start, + LogFileKey end, boolean checkFirstKey) throws IOException { - iterators = new ArrayList<>(recoveryLogPaths.size()); + List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size()); + scanners = new ArrayList<>(); + Range range = LogFileKey.toRange(start, end); + var vm = context.getVolumeManager(); - try { - for (Path log : recoveryLogPaths) { - LOG.debug("Opening recovery log {}", log.getName()); - RecoveryLogReader rlr = new RecoveryLogReader(fs, log, start, end); - if (rlr.hasNext()) { + for (Path logDir : recoveryLogDirs) { + LOG.debug("Opening recovery log dir {}", logDir.getName()); + List<Path> logFiles = getFiles(vm, logDir); + var fs = vm.getFileSystemByPath(logDir); + + // only check the first key once to prevent extra iterator creation and seeking + if (checkFirstKey) { + validateFirstKey(context, fs, logFiles, logDir); + } + + for (Path log : logFiles) { + var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs) + .withTableProperties(context.getConfiguration()).build(); + + scanner.setRange(range); + Iterator<Entry<Key,Value>> scanIter = scanner.iterator(); + + if (scanIter.hasNext()) { LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end); - iterators.add(rlr); + iterators.add(scanIter); + scanners.add(scanner); } else { LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end); - rlr.close(); + scanner.close(); } } - - iter = Iterators.mergeSorted(iterators, (o1, o2) -> o1.getKey().compareTo(o2.getKey())); - - } catch (RuntimeException | IOException e) { - try { - close(); - } catch (Exception e2) { - e.addSuppressed(e2); - } - throw e; } + iter = Iterators.mergeSorted(iterators, Entry.comparingByKey()); } @Override @@ -83,7 +104,9 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey, @Override public Entry<LogFileKey,LogFileValue> next() { - return iter.next(); + Entry<Key,Value> e = iter.next(); + return new AbstractMap.SimpleImmutableEntry<>(LogFileKey.fromKey(e.getKey()), + LogFileValue.fromValue(e.getValue())); } @Override @@ -93,11 +116,50 @@ public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey, @Override public void close() { - for (CloseableIterator<?> reader : iterators) { - try { - reader.close(); - } catch (IOException e) { - LOG.debug("Failed to close reader", e); + scanners.forEach(ScannerBase::close); + } + + /** + * Check for sorting signal files (finished/failed) and get the logs in the provided directory. + */ + private List<Path> getFiles(VolumeManager fs, Path directory) throws IOException { + boolean foundFinish = false; + List<Path> logFiles = new ArrayList<>(); + for (FileStatus child : fs.listStatus(directory)) { + if (child.getPath().getName().startsWith("_")) + continue; + if (SortedLogState.isFinished(child.getPath().getName())) { + foundFinish = true; + continue; + } + if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { + continue; + } + FileSystem ns = fs.getFileSystemByPath(child.getPath()); + Path fullLogPath = ns.makeQualified(child.getPath()); + logFiles.add(fullLogPath); + } + if (!foundFinish) + throw new IOException( + "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory); + return logFiles; + } + + /** + * Check that the first entry in the WAL is OPEN. Only need to do this once. + */ + private void validateFirstKey(ServerContext context, FileSystem fs, List<Path> logFiles, + Path fullLogPath) { + try (var scanner = + RFile.newScanner().from(logFiles.stream().map(Path::toString).toArray(String[]::new)) + .withFileSystem(fs).withTableProperties(context.getConfiguration()).build()) { + Iterator<Entry<Key,Value>> iterator = scanner.iterator(); + if (iterator.hasNext()) { + Key firstKey = iterator.next().getKey(); + LogFileKey key = LogFileKey.fromKey(firstKey); + if (key.event != LogEvents.OPEN) { + throw new IllegalStateException("First log entry is not OPEN " + fullLogPath); + } } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 9fc425a..432b24a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log; import static com.google.common.base.Preconditions.checkState; +import static java.util.Collections.max; import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; @@ -30,6 +31,7 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -41,7 +43,7 @@ import java.util.Set; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.logger.LogEvents; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; @@ -61,10 +63,10 @@ public class SortedLogRecovery { private static final Logger log = LoggerFactory.getLogger(SortedLogRecovery.class); - private VolumeManager fs; + private final ServerContext context; - public SortedLogRecovery(VolumeManager fs) { - this.fs = fs; + public SortedLogRecovery(ServerContext context) { + this.context = context; } static LogFileKey maxKey(LogEvents event) { @@ -98,11 +100,11 @@ public class SortedLogRecovery { return key; } - private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogs) throws IOException { + private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws IOException { int tabletId = -1; - try (RecoveryLogsIterator rli = - new RecoveryLogsIterator(fs, recoveryLogs, minKey(DEFINE_TABLET), maxKey(DEFINE_TABLET))) { + try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET), + maxKey(DEFINE_TABLET), true)) { KeyExtent alternative = extent; if (extent.isRootTablet()) { @@ -138,24 +140,24 @@ public class SortedLogRecovery { * ID. */ private Entry<Integer,List<Path>> findLogsThatDefineTablet(KeyExtent extent, - List<Path> recoveryLogs) throws IOException { + List<Path> recoveryDirs) throws IOException { Map<Integer,List<Path>> logsThatDefineTablet = new HashMap<>(); - for (Path wal : recoveryLogs) { - int tabletId = findMaxTabletId(extent, Collections.singletonList(wal)); + for (Path walDir : recoveryDirs) { + int tabletId = findMaxTabletId(extent, Collections.singletonList(walDir)); if (tabletId == -1) { - log.debug("Did not find tablet {} in recovery log {}", extent, wal.getName()); + log.debug("Did not find tablet {} in recovery log {}", extent, walDir.getName()); } else { - logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(wal); - log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, wal.getName()); + logsThatDefineTablet.computeIfAbsent(tabletId, k -> new ArrayList<>()).add(walDir); + log.debug("Found tablet {} with id {} in recovery log {}", extent, tabletId, + walDir.getName()); } } if (logsThatDefineTablet.isEmpty()) { - return new AbstractMap.SimpleEntry<>(-1, Collections.<Path>emptyList()); + return new AbstractMap.SimpleEntry<>(-1, Collections.emptyList()); } else { - return Collections.max(logsThatDefineTablet.entrySet(), - (o1, o2) -> Integer.compare(o1.getKey(), o2.getKey())); + return max(logsThatDefineTablet.entrySet(), Comparator.comparingInt(Entry::getKey)); } } @@ -202,8 +204,8 @@ public class SortedLogRecovery { long lastFinish = 0; long recoverySeq = 0; - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, - minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId))) { + try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs, + minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) { DeduplicatingIterator ddi = new DeduplicatingIterator(rli); @@ -258,21 +260,22 @@ public class SortedLogRecovery { LogFileKey end = maxKey(MUTATION, tabletId); - try (RecoveryLogsIterator rli = new RecoveryLogsIterator(fs, recoveryLogs, start, end)) { + try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) { while (rli.hasNext()) { Entry<LogFileKey,LogFileValue> entry = rli.next(); + LogFileKey logFileKey = entry.getKey(); - checkState(entry.getKey().tabletId == tabletId); // should only fail if bug elsewhere - checkState(entry.getKey().seq >= recoverySeq); // should only fail if bug elsewhere + checkState(logFileKey.tabletId == tabletId); // should only fail if bug elsewhere + checkState(logFileKey.seq >= recoverySeq); // should only fail if bug elsewhere - if (entry.getKey().event == MUTATION) { - mr.receive(entry.getValue().mutations.get(0)); - } else if (entry.getKey().event == MANY_MUTATIONS) { - for (Mutation m : entry.getValue().mutations) { + LogFileValue val = entry.getValue(); + if (logFileKey.event == MUTATION || logFileKey.event == MANY_MUTATIONS) { + log.debug("Recover {} mutation(s) for {}", val.mutations.size(), entry.getKey()); + for (Mutation m : val.mutations) { mr.receive(m); } } else { - throw new IllegalStateException("Non mutation event seen " + entry.getKey().event); + throw new IllegalStateException("Non mutation event seen " + logFileKey.event); } } } @@ -282,10 +285,10 @@ public class SortedLogRecovery { return Collections2.transform(recoveryLogs, Path::getName); } - public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, + public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { - Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryLogs); + Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs); // A tablet may leave a tserver and then come back, in which case it would have a different and // higher tablet id. Only want to consider events in the log related to the last time the tablet @@ -294,11 +297,11 @@ public class SortedLogRecovery { List<Path> logsThatDefineTablet = maxEntry.getValue(); if (tabletId == -1) { - log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryLogs)); + log.info("Tablet {} is not defined in recovery logs {} ", extent, asNames(recoveryDirs)); return; } else { log.info("Found {} of {} logs with max id {} for tablet {}", logsThatDefineTablet.size(), - recoveryLogs.size(), tabletId, extent); + recoveryDirs.size(), tabletId, extent); } // Find the seq # for the last compaction that started and finished diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 0c4e1c2..8d1a113 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.fate.util.Retry.RetryFactory; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; @@ -530,11 +531,11 @@ public class TabletServerLogger { return seq; } - public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles, - MutationReceiver mr) throws IOException { + public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs, + Set<String> tabletFiles, MutationReceiver mr) throws IOException { try { - SortedLogRecovery recovery = new SortedLogRecovery(fs); - recovery.recover(extent, logs, tabletFiles, mr); + SortedLogRecovery recovery = new SortedLogRecovery(context); + recovery.recover(extent, recoveryDirs, tabletFiles, mr); } catch (Exception e) { throw new IOException(e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java index 31dfce7..bf08ca4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.tserver.logger; +import static java.util.Arrays.copyOf; import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION; @@ -26,10 +27,18 @@ import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import com.google.common.base.Preconditions; + public class LogFileKey implements WritableComparable<LogFileKey> { public LogEvents event; @@ -189,4 +198,162 @@ public class LogFileKey implements WritableComparable<LogFileKey> { } throw new RuntimeException("Unknown type of entry: " + event); } + + /** + * Converts LogFileKey to Key. Creates a Key containing all of the LogFileKey fields. The fields + * are stored so the Key sorts maintaining the legacy sort order. The row of the Key is composed + * of 3 fields: EventNum + tabletID + seq. The EventNum is the byte returned by eventType(). The + * column family is always the event. The column qualifier is dependent of the type of event and + * could be empty. + * + * <pre> + * Key Schema: + * Row = EventNum + tabletID + seq + * Family = event + * Qualifier = tserverSession OR filename OR KeyExtent + * </pre> + */ + public Key toKey() throws IOException { + byte[] formattedRow; + String family = event.name(); + var kb = Key.builder(); + switch (event) { + case OPEN: + formattedRow = formatRow(0, 0); + return kb.row(formattedRow).family(family).qualifier(tserverSession).build(); + case COMPACTION_START: + formattedRow = formatRow(tabletId, seq); + return kb.row(formattedRow).family(family).qualifier(filename).build(); + case MUTATION: + case MANY_MUTATIONS: + case COMPACTION_FINISH: + return kb.row(formatRow(tabletId, seq)).family(family).build(); + case DEFINE_TABLET: + formattedRow = formatRow(tabletId, seq); + DataOutputBuffer buffer = new DataOutputBuffer(); + tablet.writeTo(buffer); + var q = copyOf(buffer.getData(), buffer.getLength()); + buffer.close(); + return kb.row(formattedRow).family(family).qualifier(q).build(); + default: + throw new AssertionError("Invalid event type in LogFileKey: " + event); + } + } + + /** + * Get the first byte for the event. The only possible values are 0-4. This is used as the highest + * byte in the row. + */ + private byte getEventByte() { + int evenTypeInteger = eventType(event); + return (byte) (evenTypeInteger & 0xff); + } + + /** + * Get the byte encoded row for this LogFileKey as a Text object. + */ + private Text formatRow() { + return new Text(formatRow(tabletId, seq)); + } + + /** + * Format the row using 13 bytes encoded to allow proper sorting of the RFile Key. The highest + * byte is for the event number, 4 bytes for the tabletId and 8 bytes for the sequence long. + */ + private byte[] formatRow(int tabletId, long seq) { + byte eventNum = getEventByte(); + // These will not sort properly when encoded if negative. Negative is not expected currently, + // defending against future changes and/or bugs. + Preconditions.checkArgument(eventNum >= 0 && seq >= 0); + byte[] row = new byte[13]; + // encode the signed integer so negatives will sort properly for tabletId + int encodedTabletId = tabletId ^ 0x80000000; + + row[0] = eventNum; + row[1] = (byte) ((encodedTabletId >>> 24) & 0xff); + row[2] = (byte) ((encodedTabletId >>> 16) & 0xff); + row[3] = (byte) ((encodedTabletId >>> 8) & 0xff); + row[4] = (byte) (encodedTabletId & 0xff); + row[5] = (byte) (seq >>> 56); + row[6] = (byte) (seq >>> 48); + row[7] = (byte) (seq >>> 40); + row[8] = (byte) (seq >>> 32); + row[9] = (byte) (seq >>> 24); + row[10] = (byte) (seq >>> 16); + row[11] = (byte) (seq >>> 8); + row[12] = (byte) (seq); // >>> 0 + return row; + } + + /** + * Extract the tabletId integer from the byte encoded Row. + */ + private static int getTabletId(byte[] row) { + int encoded = ((row[1] << 24) + (row[2] << 16) + (row[3] << 8) + row[4]); + return encoded ^ 0x80000000; + } + + /** + * Extract the sequence long from the byte encoded Row. + */ + private static long getSequence(byte[] row) { + // @formatter:off + return (((long) row[5] << 56) + + ((long) (row[6] & 255) << 48) + + ((long) (row[7] & 255) << 40) + + ((long) (row[8] & 255) << 32) + + ((long) (row[9] & 255) << 24) + + ((row[10] & 255) << 16) + + ((row[11] & 255) << 8) + + ((row[12] & 255))); + // @formatter:on + } + + public static Range toRange(LogFileKey start, LogFileKey end) { + return new Range(start.formatRow(), end.formatRow()); + } + + /** + * Create LogFileKey from row. Follows schema defined by {@link #toKey()} + */ + public static LogFileKey fromKey(Key key) { + var logFileKey = new LogFileKey(); + byte[] rowParts = key.getRow().getBytes(); + + logFileKey.tabletId = getTabletId(rowParts); + logFileKey.seq = getSequence(rowParts); + logFileKey.event = LogEvents.valueOf(key.getColumnFamilyData().toString()); + // verify event number in row matches column family + if (eventType(logFileKey.event) != rowParts[0]) { + throw new AssertionError("Event in row differs from column family. Key: " + key); + } + + // handle special cases of what is stored in the qualifier + switch (logFileKey.event) { + case OPEN: + logFileKey.tserverSession = key.getColumnQualifierData().toString(); + break; + case COMPACTION_START: + logFileKey.filename = key.getColumnQualifierData().toString(); + break; + case DEFINE_TABLET: + try (DataInputBuffer buffer = new DataInputBuffer()) { + byte[] bytes = key.getColumnQualifierData().toArray(); + buffer.reset(bytes, bytes.length); + logFileKey.tablet = KeyExtent.readFrom(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + break; + case COMPACTION_FINISH: + case MANY_MUTATIONS: + case MUTATION: + // nothing to do + break; + default: + throw new AssertionError("Invalid event type in key: " + key); + } + + return logFileKey; + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java index 53a5ce9..b4ee431 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java @@ -20,15 +20,21 @@ package org.apache.accumulo.tserver.logger; import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.server.data.ServerMutation; import org.apache.hadoop.io.Writable; @@ -93,4 +99,26 @@ public class LogFileValue implements Writable { return format(this, 5); } + /** + * Convert list of mutations to a byte array and use to create a Value + */ + public Value toValue() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + write(new DataOutputStream(baos)); + return new Value(baos.toByteArray()); + } + + /** + * Get the mutations from the Value + */ + public static LogFileValue fromValue(Value value) { + LogFileValue logFileValue = new LogFileValue(); + try (var bais = new ByteArrayInputStream(value.get())) { + logFileValue.readFields(new DataInputStream(bais)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return logFileValue; + } + } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index c6ce044..2121725 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -23,6 +23,10 @@ import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION; import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -41,10 +45,14 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.data.ServerMutation; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.log.SortedLogState; @@ -53,9 +61,9 @@ import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.MapFile; -import org.apache.hadoop.io.MapFile.Writer; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -65,15 +73,22 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") public class SortedLogRecoveryTest { + static final int bufferSize = 5; static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null); static final Text cf = new Text("cf"); static final Text cq = new Text("cq"); static final Value value = new Value("value"); + static ServerContext context; @Rule public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + @Before + public void setup() { + context = EasyMock.createMock(ServerContext.class); + } + static class KeyValue implements Comparable<KeyValue> { public final LogFileKey key; public final LogFileValue value; @@ -138,33 +153,46 @@ public class SortedLogRecoveryTest { } private List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) throws IOException { - return recover(logs, new HashSet<>(), extent); + return recover(logs, new HashSet<>(), extent, bufferSize); } - private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent) - throws IOException { + private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent, + int bufferSize) throws IOException { + final String workdir = tempFolder.newFolder().getAbsolutePath(); try (var fs = VolumeManagerImpl.getLocalForTesting(workdir)) { + expect(context.getVolumeManager()).andReturn(fs).anyTimes(); + expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance()) + .anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + replay(context); final Path workdirPath = new Path("file://" + workdir); fs.deleteRecursively(workdirPath); + ArrayList<Path> dirs = new ArrayList<>(); for (Entry<String,KeyValue[]> entry : logs.entrySet()) { - String path = workdir + "/" + entry.getKey(); - FileSystem ns = fs.getFileSystemByPath(new Path(path)); - @SuppressWarnings("deprecation") - Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class, - LogFileValue.class); - for (KeyValue lfe : entry.getValue()) { - map.append(lfe.key, lfe.value); + String destPath = workdir + "/" + entry.getKey(); + FileSystem ns = fs.getFileSystemByPath(new Path(destPath)); + // convert test object to Pairs for LogSorter, flushing based on bufferSize + List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); + int parts = 0; + for (KeyValue pair : entry.getValue()) { + buffer.add(new Pair<>(pair.key, pair.value)); + if (buffer.size() >= bufferSize) { + LogSorter.writeBuffer(context, destPath, buffer, parts++); + buffer.clear(); + } } - map.close(); - ns.create(SortedLogState.getFinishedMarkerPath(path)).close(); - dirs.add(new Path(path)); + LogSorter.writeBuffer(context, destPath, buffer, parts); + + ns.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); + dirs.add(new Path(destPath)); } // Recover - SortedLogRecovery recovery = new SortedLogRecovery(fs); + SortedLogRecovery recovery = new SortedLogRecovery(context); CaptureMutations capture = new CaptureMutations(); recovery.recover(extent, dirs, files, capture); + verify(context); return capture.result; } } @@ -307,7 +335,6 @@ public class SortedLogRecoveryTest { List<Mutation> mutations = recover(logs, extent); // Verify recovered data assertEquals(0, mutations.size()); - } @Test @@ -659,7 +686,7 @@ public class SortedLogRecoveryTest { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("entries", entries); - List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent); + List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize); assertEquals(0, mutations.size()); } @@ -682,7 +709,7 @@ public class SortedLogRecoveryTest { Map<String,KeyValue[]> logs = new TreeMap<>(); logs.put("entries", entries); - List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent); + List<Mutation> mutations = recover(logs, Collections.singleton("/t/f1"), extent, bufferSize); assertEquals(1, mutations.size()); assertEquals(m, mutations.get(0)); @@ -752,6 +779,7 @@ public class SortedLogRecoveryTest { assertEquals(1, mutations1.size()); assertEquals(m2, mutations1.get(0)); + reset(context); List<Mutation> mutations2 = recover(logs, e2); assertEquals(2, mutations2.size()); assertEquals(m3, mutations2.get(0)); @@ -762,6 +790,7 @@ public class SortedLogRecoveryTest { Arrays.sort(entries2); logs.put("entries2", entries2); + reset(context); mutations2 = recover(logs, e2); assertEquals(1, mutations2.size()); assertEquals(m4, mutations2.get(0)); @@ -785,7 +814,7 @@ public class SortedLogRecoveryTest { HashSet<String> filesSet = new HashSet<>(); filesSet.addAll(Arrays.asList(tabletFiles)); - List<Mutation> mutations = recover(logs, filesSet, extent); + List<Mutation> mutations = recover(logs, filesSet, extent, bufferSize); if (startMatches) { assertEquals(1, mutations.size()); @@ -802,6 +831,7 @@ public class SortedLogRecoveryTest { // test having different paths for the same file. This can happen as a result of upgrade or user // changing configuration runPathTest(false, "/t1/f1", "/t1/f0"); + reset(context); runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1"); String[] aliases = {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1", @@ -812,9 +842,12 @@ public class SortedLogRecoveryTest { for (String alias1 : aliases) { for (String alias2 : aliases) { + reset(context); runPathTest(true, alias1, alias2); for (String other : others) { + reset(context); runPathTest(true, alias1, other, alias2); + reset(context); runPathTest(true, alias1, alias2, other); } } @@ -822,6 +855,7 @@ public class SortedLogRecoveryTest { for (String alias1 : aliases) { for (String other : others) { + reset(context); runPathTest(false, alias1, other); } } @@ -972,29 +1006,34 @@ public class SortedLogRecoveryTest { logs.put("entries2", entries2); + reset(context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries3", entries3); + reset(context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries4", entries4); + reset(context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m1, mutations.get(0)); logs.put("entries5", entries5); + reset(context); mutations = recover(logs, extent); assertEquals(0, mutations.size()); logs.put("entries6", entries6); + reset(context); mutations = recover(logs, extent); assertEquals(1, mutations.size()); assertEquals(m2, mutations.get(0)); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java index ed88901..23614bd 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java @@ -35,6 +35,7 @@ import java.io.OutputStream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; @@ -73,6 +74,8 @@ public class TestUpgradePathForWALogs { String path = workDir.getAbsolutePath(); assertTrue(workDir.delete()); VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path); + expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance()) + .anyTimes(); expect(context.getVolumeManager()).andReturn(fs).anyTimes(); replay(context); }