This is an automated email from the ASF dual-hosted git repository. jmanno pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f9f1d3f Update LogReader to utilize RecoveryLogsIterator (#2181) f9f1d3f is described below commit f9f1d3f6578c400862e9f0e8d82a49bf9c4a5392 Author: Jeffrey Manno <jeffreymann...@gmail.com> AuthorDate: Wed Jul 14 08:15:52 2021 -0400 Update LogReader to utilize RecoveryLogsIterator (#2181) * Adds utilization of RecoveryLogsIterator to read sorted Rfiles inside LogReader.java * Removed old implementation of RecoveryLogReader and removed associated test * Added unit test for RecoveryLogsIterator, RecoveryLogsIteratorTest Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../accumulo/tserver/log/RecoveryLogReader.java | 326 --------------------- .../accumulo/tserver/log/RecoveryLogsIterator.java | 4 +- .../apache/accumulo/tserver/logger/LogReader.java | 31 +- .../tserver/log/RecoveryLogsIteratorTest.java | 251 ++++++++++++++++ .../tserver/log/RecoveryLogsReaderTest.java | 216 -------------- 5 files changed, 273 insertions(+), 555 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java deleted file mode 100644 index bde5a1c..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.tserver.log; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.AbstractMap; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.PriorityQueue; - -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.log.SortedLogState; -import org.apache.accumulo.tserver.logger.LogEvents; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.MapFile.Reader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; - -/** - * A class which reads sorted recovery logs produced from a single WAL. - * - * Presently only supports next() and seek() and works on all the Map directories within a - * directory. The primary purpose of this class is to merge the results of multiple Reduce jobs that - * result in Map output files. - */ -public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,LogFileValue>> { - - /** - * Group together the next key/value from a Reader with the Reader - */ - private static class Index implements Comparable<Index> { - Reader reader; - WritableComparable<?> key; - Writable value; - boolean cached = false; - - private static Object create(java.lang.Class<?> klass) { - try { - return klass.getConstructor().newInstance(); - } catch (Exception t) { - throw new RuntimeException("Unable to construct objects to use for comparison"); - } - } - - public Index(Reader reader) { - this.reader = reader; - key = (WritableComparable<?>) create(reader.getKeyClass()); - value = (Writable) create(reader.getValueClass()); - } - - private void cache() throws IOException { - if (!cached && reader.next(key, value)) { - cached = true; - } - } - - @Override - public int hashCode() { - return Objects.hashCode(key); - } - - @Override - public boolean equals(Object obj) { - return this == obj || (obj != null && obj instanceof Index && compareTo((Index) obj) == 0); - } - - @Override - public int compareTo(Index o) { - try { - cache(); - o.cache(); - // no more data: always goes to the end - if (!cached) - return 1; - if (!o.cached) - return -1; - @SuppressWarnings({"unchecked", "rawtypes"}) - int result = ((WritableComparable) key).compareTo(o.key); - return result; - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - } - } - - private PriorityQueue<Index> heap = new PriorityQueue<>(); - private Iterator<Entry<LogFileKey,LogFileValue>> iter; - - public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException { - this(fs, directory, null, null); - } - - public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, LogFileKey end) - throws IOException { - boolean foundFinish = false; - for (FileStatus child : fs.listStatus(directory)) { - if (child.getPath().getName().startsWith("_")) - continue; - if (SortedLogState.isFinished(child.getPath().getName())) { - foundFinish = true; - continue; - } - if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) { - continue; - } - FileSystem ns = fs.getFileSystemByPath(child.getPath()); - heap.add(new Index(new Reader(ns.makeQualified(child.getPath()), ns.getConf()))); - } - if (!foundFinish) - throw new IOException( - "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory); - - iter = new SortCheckIterator(new RangeIterator(start, end)); - } - - private static void copy(Writable src, Writable dest) throws IOException { - // not exactly efficient... - DataOutputBuffer output = new DataOutputBuffer(); - src.write(output); - DataInputBuffer input = new DataInputBuffer(); - input.reset(output.getData(), output.getLength()); - dest.readFields(input); - } - - @VisibleForTesting - synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException { - Index elt = heap.remove(); - try { - elt.cache(); - if (elt.cached) { - copy(elt.key, key); - copy(elt.value, val); - elt.cached = false; - } else { - return false; - } - } finally { - heap.add(elt); - } - return true; - } - - @VisibleForTesting - synchronized boolean seek(WritableComparable<?> key) throws IOException { - PriorityQueue<Index> reheap = new PriorityQueue<>(heap.size()); - boolean result = false; - for (Index index : heap) { - try { - WritableComparable<?> found = index.reader.getClosest(key, index.value, true); - if (found != null && found.equals(key)) { - result = true; - } - } catch (EOFException ex) { - // thrown if key is beyond all data in the map - } - index.cached = false; - reheap.add(index); - } - heap = reheap; - return result; - } - - @Override - public void close() throws IOException { - IOException problem = null; - for (Index index : heap) { - try { - index.reader.close(); - } catch (IOException ex) { - problem = ex; - } - } - if (problem != null) - throw problem; - heap = null; - } - - /** - * Ensures source iterator provides data in sorted order - */ - @VisibleForTesting - static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> { - - private PeekingIterator<Entry<LogFileKey,LogFileValue>> source; - - SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) { - this.source = Iterators.peekingIterator(source); - - } - - @Override - public boolean hasNext() { - return source.hasNext(); - } - - @Override - public Entry<LogFileKey,LogFileValue> next() { - Entry<LogFileKey,LogFileValue> next = source.next(); - if (source.hasNext()) { - Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0, - "Keys not in order %s %s", next.getKey(), source.peek().getKey()); - } - return next; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - } - - private class RangeIterator implements Iterator<Entry<LogFileKey,LogFileValue>> { - - private LogFileKey key = new LogFileKey(); - private LogFileValue value = new LogFileValue(); - private boolean hasNext; - private LogFileKey end; - - private boolean next(LogFileKey key, LogFileValue value) throws IOException { - try { - return RecoveryLogReader.this.next(key, value); - } catch (EOFException e) { - return false; - } - } - - RangeIterator(LogFileKey start, LogFileKey end) throws IOException { - this.end = end; - - if (start != null) { - hasNext = next(key, value); - - if (hasNext && key.event != LogEvents.OPEN) { - throw new IllegalStateException("First log entry value is not OPEN"); - } - - seek(start); - } - - hasNext = next(key, value); - - if (hasNext && start != null && key.compareTo(start) < 0) { - throw new IllegalStateException("First key is less than start " + key + " " + start); - } - - if (hasNext && end != null && key.compareTo(end) > 0) { - hasNext = false; - } - } - - @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public Entry<LogFileKey,LogFileValue> next() { - Preconditions.checkState(hasNext); - Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key, value); - - key = new LogFileKey(); - value = new LogFileValue(); - try { - hasNext = next(key, value); - if (hasNext && end != null && key.compareTo(end) > 0) { - hasNext = false; - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - - return entry; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - } - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public Entry<LogFileKey,LogFileValue> next() { - return iter.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("remove"); - } - -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index 0f5e259..75999d7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -59,12 +59,12 @@ public class RecoveryLogsIterator /** * Scans the files in each recoveryLogDir over the range [start,end]. */ - RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start, + public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start, LogFileKey end, boolean checkFirstKey) throws IOException { List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size()); scanners = new ArrayList<>(); - Range range = LogFileKey.toRange(start, end); + Range range = start == null ? null : LogFileKey.toRange(start, end); var vm = context.getVolumeManager(); for (Path logDir : recoveryLogDirs) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 5c0ce29..a984418 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.DataInputStream; import java.io.EOFException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; @@ -35,11 +36,13 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; -import org.apache.accumulo.tserver.log.RecoveryLogReader; +import org.apache.accumulo.tserver.log.RecoveryLogsIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; @@ -72,7 +75,7 @@ public class LogReader implements KeywordExecutable { } /** - * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system. + * Dump a Log File to stdout. Will read from HDFS or local file system. * * @param args * - first argument is the file to print @@ -103,7 +106,8 @@ public class LogReader implements KeywordExecutable { } var siteConfig = SiteConfiguration.auto(); - try (var fs = VolumeManagerImpl.get(siteConfig, new Configuration())) { + ServerContext context = new ServerContext(siteConfig); + try (VolumeManager fs = VolumeManagerImpl.get(siteConfig, new Configuration())) { Matcher rowMatcher = null; KeyExtent ke = null; @@ -123,13 +127,18 @@ public class LogReader implements KeywordExecutable { Set<Integer> tabletIds = new HashSet<>(); for (String file : opts.files) { - Path path = new Path(file); LogFileKey key = new LogFileKey(); LogFileValue value = new LogFileValue(); + // ensure it's a regular non-sorted WAL file, and not a single sorted WAL in RFile format if (fs.getFileStatus(path).isFile()) { - // read log entries from a simple hdfs file + if (file.endsWith(".rf")) { + log.error("Unable to read from a single RFile. A non-sorted WAL file was expected. " + + "To read sorted WALs, please pass in a directory containing the sorted recovery logs."); + continue; + } + try (final FSDataInputStream fsinput = fs.open(path); DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) { while (true) { @@ -146,10 +155,12 @@ public class LogReader implements KeywordExecutable { continue; } } else { - // read the log entries sorted in a map file - try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) { - while (input.hasNext()) { - Entry<LogFileKey,LogFileValue> entry = input.next(); + // read the log entries in a sorted RFile. This has to be a directory that contains the + // finished file. + try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), null, + null, false)) { + while (rli.hasNext()) { + Entry<LogFileKey,LogFileValue> entry = rli.next(); printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds, opts.maxMutations); } @@ -200,9 +211,7 @@ public class LogReader implements KeywordExecutable { } } - System.out.println(key); System.out.println(LogFileValue.format(value, maxMutations)); } - } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java new file mode 100644 index 0000000..d6a013e --- /dev/null +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.log; + +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; + +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.crypto.CryptoServiceFactory; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") +public class RecoveryLogsIteratorTest { + + private VolumeManager fs; + private File workDir; + static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null); + static ServerContext context; + static LogSorter logSorter; + + @Rule + public TemporaryFolder tempFolder = + new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + @Before + public void setUp() throws Exception { + context = createMock(ServerContext.class); + logSorter = new LogSorter(context, DefaultConfiguration.getInstance()); + + workDir = tempFolder.newFolder(); + String path = workDir.getAbsolutePath(); + assertTrue(workDir.delete()); + fs = VolumeManagerImpl.getLocalForTesting(path); + expect(context.getVolumeManager()).andReturn(fs).anyTimes(); + expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance()) + .anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + replay(context); + } + + @After + public void tearDown() throws Exception { + fs.close(); + } + + static class KeyValue implements Comparable<KeyValue> { + public final LogFileKey key; + public final LogFileValue value; + + KeyValue() { + key = new LogFileKey(); + value = new LogFileValue(); + } + + @Override + public int hashCode() { + return Objects.hashCode(key) + Objects.hashCode(value); + } + + @Override + public boolean equals(Object obj) { + return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue) obj)); + } + + @Override + public int compareTo(KeyValue o) { + return key.compareTo(o.key); + } + } + + @Test + public void testSimpleRLI() throws IOException { + KeyValue keyValue = new KeyValue(); + keyValue.key.event = DEFINE_TABLET; + keyValue.key.seq = 0; + keyValue.key.tabletId = 1; + keyValue.key.tablet = extent; + + KeyValue[] keyValues = {keyValue}; + + Map<String,KeyValue[]> logs = new TreeMap<>(); + logs.put("keyValues", keyValues); + + ArrayList<Path> dirs = new ArrayList<>(); + + createRecoveryDir(logs, dirs, true); + + try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) { + while (rli.hasNext()) { + Entry<LogFileKey,LogFileValue> entry = rli.next(); + assertEquals("TabletId does not match", 1, entry.getKey().tabletId); + assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event); + } + } + } + + @Test + public void testFinishMarker() throws IOException { + KeyValue keyValue = new KeyValue(); + keyValue.key.event = DEFINE_TABLET; + keyValue.key.seq = 0; + keyValue.key.tabletId = 1; + keyValue.key.tablet = extent; + + KeyValue[] keyValues = {keyValue}; + + Map<String,KeyValue[]> logs = new TreeMap<>(); + logs.put("keyValues", keyValues); + + ArrayList<Path> dirs = new ArrayList<>(); + + createRecoveryDir(logs, dirs, false); + + assertThrows("Finish marker should not be found", IOException.class, + () -> new RecoveryLogsIterator(context, dirs, null, null, false)); + } + + @Test + public void testSingleFile() throws IOException { + String destPath = workDir + "/test.rf"; + fs.create(new Path(destPath)); + + assertThrows("Finish marker should not be found for a single file.", IOException.class, + () -> new RecoveryLogsIterator(context, Collections.singletonList(new Path(destPath)), null, + null, false)); + } + + @Test + public void testCheckFirstKeyFailed() throws IOException { + KeyValue keyValue = new KeyValue(); + keyValue.key.event = DEFINE_TABLET; + keyValue.key.seq = 0; + keyValue.key.tabletId = 1; + keyValue.key.tablet = extent; + + KeyValue[] keyValues = {keyValue}; + + Map<String,KeyValue[]> logs = new TreeMap<>(); + logs.put("keyValues", keyValues); + + ArrayList<Path> dirs = new ArrayList<>(); + + createRecoveryDir(logs, dirs, true); + + assertThrows("First log entry is not OPEN so exception should be thrown.", + IllegalStateException.class, + () -> new RecoveryLogsIterator(context, dirs, null, null, true)); + } + + @Test + public void testCheckFirstKeyPass() throws IOException { + KeyValue keyValue1 = new KeyValue(); + keyValue1.key.event = OPEN; + keyValue1.key.seq = 0; + keyValue1.key.tabletId = -1; + keyValue1.key.tserverSession = "1"; + + KeyValue keyValue2 = new KeyValue(); + keyValue2.key.event = DEFINE_TABLET; + keyValue2.key.seq = 0; + keyValue2.key.tabletId = 1; + keyValue2.key.tablet = extent; + + KeyValue[] keyValues = {keyValue1, keyValue2}; + + Map<String,KeyValue[]> logs = new TreeMap<>(); + logs.put("keyValues", keyValues); + + ArrayList<Path> dirs = new ArrayList<>(); + + createRecoveryDir(logs, dirs, true); + + try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, true)) { + while (rli.hasNext()) { + Entry<LogFileKey,LogFileValue> entry = rli.next(); + assertNotNull(entry.getKey()); + } + } + } + + private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<Path> dirs, + boolean FinishMarker) throws IOException { + + for (Entry<String,KeyValue[]> entry : logs.entrySet()) { + String destPath = workDir + "/dir"; + FileSystem ns = fs.getFileSystemByPath(new Path(destPath)); + + // convert test object to Pairs for LogSorter. + List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); + for (KeyValue pair : entry.getValue()) { + buffer.add(new Pair<>(pair.key, pair.value)); + } + logSorter.writeBuffer(destPath, buffer, 0); + + if (FinishMarker) + ns.create(SortedLogState.getFinishedMarkerPath(destPath)); + + dirs.add(new Path(destPath)); + } + } +} diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java deleted file mode 100644 index 5a3a0af..0000000 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.tserver.log; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.server.log.SortedLogState; -import org.apache.accumulo.tserver.log.RecoveryLogReader.SortCheckIterator; -import org.apache.accumulo.tserver.logger.LogEvents; -import org.apache.accumulo.tserver.logger.LogFileKey; -import org.apache.accumulo.tserver.logger.LogFileValue; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.MapFile.Writer; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - -@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input") -public class RecoveryLogsReaderTest { - - private VolumeManager fs; - private File workDir; - - @Rule - public TemporaryFolder tempFolder = - new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); - - @Before - public void setUp() throws Exception { - workDir = tempFolder.newFolder(); - String path = workDir.getAbsolutePath(); - assertTrue(workDir.delete()); - fs = VolumeManagerImpl.getLocalForTesting(path); - Path root = new Path("file://" + path); - fs.mkdirs(root); - fs.create(new Path(root, "finished")).close(); - FileSystem ns = fs.getFileSystemByPath(root); - - Writer oddWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "odd")), - Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class)); - BytesWritable value = new BytesWritable("someValue".getBytes()); - for (int i = 1; i < 1000; i += 2) { - oddWriter.append(new IntWritable(i), value); - } - oddWriter.close(); - - Writer evenWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "even")), - Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class)); - for (int i = 0; i < 1000; i += 2) { - if (i == 10) - continue; - evenWriter.append(new IntWritable(i), value); - } - evenWriter.close(); - } - - @After - public void tearDown() throws Exception { - fs.close(); - } - - private void scan(RecoveryLogReader reader, int start) throws IOException { - IntWritable key = new IntWritable(); - BytesWritable value = new BytesWritable(); - - for (int i = start + 1; i < 1000; i++) { - if (i == 10) - continue; - assertTrue(reader.next(key, value)); - assertEquals(i, key.get()); - } - } - - private void scanOdd(RecoveryLogReader reader, int start) throws IOException { - IntWritable key = new IntWritable(); - BytesWritable value = new BytesWritable(); - - for (int i = start + 2; i < 1000; i += 2) { - assertTrue(reader.next(key, value)); - assertEquals(i, key.get()); - } - } - - @Test - public void testMultiReader() throws IOException { - Path manyMaps = new Path("file://" + workDir.getAbsolutePath()); - RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps); - IntWritable key = new IntWritable(); - BytesWritable value = new BytesWritable(); - - for (int i = 0; i < 1000; i++) { - if (i == 10) - continue; - assertTrue(reader.next(key, value)); - assertEquals(i, key.get()); - } - assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0); - assertFalse(reader.next(key, value)); - - key.set(500); - assertTrue(reader.seek(key)); - scan(reader, 500); - key.set(10); - assertFalse(reader.seek(key)); - scan(reader, 10); - key.set(1000); - assertFalse(reader.seek(key)); - assertFalse(reader.next(key, value)); - key.set(-1); - assertFalse(reader.seek(key)); - key.set(0); - assertTrue(reader.next(key, value)); - assertEquals(0, key.get()); - reader.close(); - - fs.deleteRecursively(new Path(manyMaps, "even")); - reader = new RecoveryLogReader(fs, manyMaps); - key.set(501); - assertTrue(reader.seek(key)); - scanOdd(reader, 501); - key.set(1000); - assertFalse(reader.seek(key)); - assertFalse(reader.next(key, value)); - key.set(-1); - assertFalse(reader.seek(key)); - key.set(1); - assertTrue(reader.next(key, value)); - assertEquals(1, key.get()); - reader.close(); - - } - - @Test(expected = IllegalStateException.class) - public void testSortCheck() { - - List<Entry<LogFileKey,LogFileValue>> unsorted = new ArrayList<>(); - - LogFileKey k1 = new LogFileKey(); - k1.event = LogEvents.MANY_MUTATIONS; - k1.tabletId = 2; - k1.seq = 55; - - LogFileKey k2 = new LogFileKey(); - k2.event = LogEvents.MANY_MUTATIONS; - k2.tabletId = 9; - k2.seq = 9; - - unsorted.add(new AbstractMap.SimpleEntry<>(k2, (LogFileValue) null)); - unsorted.add(new AbstractMap.SimpleEntry<>(k1, (LogFileValue) null)); - - SortCheckIterator iter = new SortCheckIterator(unsorted.iterator()); - - while (iter.hasNext()) { - iter.next(); - } - } - - /** - * Test a failed marker doesn't cause issues. See Github issue - * https://github.com/apache/accumulo/issues/961 - */ - @Test - public void testFailed() throws Exception { - Path manyMaps = new Path("file://" + workDir.getAbsolutePath()); - fs.create(new Path(manyMaps, SortedLogState.FAILED.getMarker())).close(); - - RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps); - IntWritable key = new IntWritable(); - BytesWritable value = new BytesWritable(); - - for (int i = 0; i < 1000; i++) { - if (i == 10) - continue; - assertTrue(reader.next(key, value)); - assertEquals(i, key.get()); - } - reader.close(); - - assertTrue(fs.delete(new Path(manyMaps, SortedLogState.FAILED.getMarker()))); - } - -}