This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3ea979d Add new property to configure rfile sorted recovery (#2236) 3ea979d is described below commit 3ea979d5971016b9c2512a96cc5b6bd5d1969b2e Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Aug 20 10:44:08 2021 -0400 Add new property to configure rfile sorted recovery (#2236) * Create new property prefix "tserver.wal.sort.file." to configure the rfiles written during sorted recovery * Add method to LogSorter to convert the sort file properties to table files properties * Create new tests in SortedLogRecoveryTest * Make method public in Compression to use in test * Add property to MultiTableRecoveryIT to allow testing in an IT --- .../org/apache/accumulo/core/conf/Property.java | 5 + .../core/file/rfile/bcfile/Compression.java | 2 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 33 ++++-- .../tserver/log/SortedLogRecoveryTest.java | 112 +++++++++++++++++++++ .../apache/accumulo/test/MultiTableRecoveryIT.java | 2 + 5 files changed, 147 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f97e802..e41f898 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -675,6 +675,11 @@ public enum Property { "The maximum number of threads to use to sort logs during" + " recovery", "1.5.0"), TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "10%", PropertyType.MEMORY, "The amount of memory to use when sorting logs during recovery.", "1.5.0"), + TSERV_WAL_SORT_FILE_PREFIX("tserver.wal.sort.file.", null, PropertyType.PREFIX, + "The rfile properties to use when sorting logs during recovery. Most of the properties" + + " that begin with 'table.file' can be used here. For example, to set the compression" + + " of the sorted recovery files to snappy use 'tserver.wal.sort.file.compress.type=snappy'", + "2.1.0"), TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT, "The number of threads for the distributed work queue. These threads are" + " used for copying failed bulk import RFiles.", diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java index dcd41a6..d4089df 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java @@ -696,7 +696,7 @@ public final class Compression { return supportedAlgorithms.toArray(new String[0]); } - static Algorithm getCompressionAlgorithmByName(final String name) { + public static Algorithm getCompressionAlgorithmByName(final String name) { Algorithm[] algorithms = Algorithm.class.getEnumConstants(); for (Algorithm algorithm : algorithms) { if (algorithm.getName().equals(name)) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index bd299af..3d593dc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -59,7 +60,7 @@ import com.google.common.annotations.VisibleForTesting; public class LogSorter { private static final Logger log = LoggerFactory.getLogger(LogSorter.class); - AccumuloConfiguration conf; + AccumuloConfiguration sortedLogConf; private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<>()); @@ -139,7 +140,7 @@ public class LogSorter { input = fs.open(srcPath); try { - decryptingInput = DfsLogger.getDecryptingStream(input, conf); + decryptingInput = DfsLogger.getDecryptingStream(input, sortedLogConf); } catch (LogHeaderIncompleteException e) { log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath); // Creating a 'finished' marker will cause recovery to proceed normally and the @@ -150,7 +151,7 @@ public class LogSorter { return; } - final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE); + final long bufferSize = sortedLogConf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE); Thread.currentThread().setName("Sorting " + name + " for recovery"); while (true) { final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); @@ -206,13 +207,33 @@ public class LogSorter { public LogSorter(ServerContext context, AccumuloConfiguration conf) { this.context = context; - this.conf = conf; + this.sortedLogConf = extractSortedLogConfig(conf); int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); this.threadPool = ThreadPools.createFixedThreadPool(threadPoolSize, this.getClass().getName(), false); this.walBlockSize = DfsLogger.getWalBlockSize(conf); } + /** + * Get the properties set with {@link Property#TSERV_WAL_SORT_FILE_PREFIX} and translate them to + * equivalent 'table.file' properties to be used when writing rfiles for sorted recovery. + */ + private AccumuloConfiguration extractSortedLogConfig(AccumuloConfiguration conf) { + final String tablePrefix = "table.file."; + var props = conf.getAllPropertiesWithPrefixStripped(Property.TSERV_WAL_SORT_FILE_PREFIX); + ConfigurationCopy copy = new ConfigurationCopy(conf); + props.forEach((prop, val) -> { + String tableProp = tablePrefix + prop; + if (Property.isTablePropertyValid(tableProp, val)) { + log.debug("Using property for writing sorted files: {}={}", tableProp, val); + copy.set(tableProp, val); + } else { + throw new IllegalArgumentException("Invalid sort file property " + prop + "=" + val); + } + }); + return copy; + } + @VisibleForTesting void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException { @@ -237,7 +258,7 @@ public class LogSorter { try (var writer = FileOperations.getInstance().newWriterBuilder() .forFile(fullPath.toString(), fs, fs.getConf(), context.getCryptoService()) - .withTableConfiguration(conf).build()) { + .withTableConfiguration(sortedLogConf).build()) { writer.startDefaultLocalityGroup(); for (var entry : keyListMap.entrySet()) { LogFileValue val = new LogFileValue(); @@ -250,7 +271,7 @@ public class LogSorter { public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException { this.threadPool = distWorkQThreadPool; - new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, conf) + new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf) .startProcessing(new LogProcessor(), this.threadPool); } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java index 582b9bc..0a2720b 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java @@ -45,12 +45,17 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.rfile.bcfile.Compression; +import org.apache.accumulo.core.file.rfile.bcfile.Utils; +import org.apache.accumulo.core.file.streams.SeekableDataInputStream; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.data.ServerMutation; @@ -59,6 +64,7 @@ import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.tserver.logger.LogEvents; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -1064,4 +1070,110 @@ public class SortedLogRecoveryTest { var e = assertThrows(IllegalStateException.class, () -> recover(logs, extent)); assertTrue(e.getMessage().contains("not " + LogEvents.OPEN)); } + + @Test + public void testInvalidLogSortedProperties() { + ConfigurationCopy testConfig = new ConfigurationCopy(DefaultConfiguration.getInstance()); + // test all the possible properties for tserver.sort.file. prefix + String prop = Property.TSERV_WAL_SORT_FILE_PREFIX + "invalid"; + testConfig.set(prop, "snappy"); + try { + new LogSorter(context, testConfig); + fail("Did not throw IllegalArgumentException for " + prop); + } catch (IllegalArgumentException e) { + // valid for test + } + } + + @Test + public void testLogSortedProperties() throws Exception { + Mutation ignored = new ServerMutation(new Text("ignored")); + ignored.put(cf, cq, value); + Mutation m = new ServerMutation(new Text("row1")); + m.put(cf, cq, value); + ConfigurationCopy testConfig = new ConfigurationCopy(DefaultConfiguration.getInstance()); + String sortFileCompression = "none"; + // test all the possible properties for tserver.sort.file. prefix + String prefix = Property.TSERV_WAL_SORT_FILE_PREFIX.toString(); + testConfig.set(prefix + "compress.type", sortFileCompression); + testConfig.set(prefix + "compress.blocksize", "50K"); + testConfig.set(prefix + "compress.blocksize.index", "56K"); + testConfig.set(prefix + "blocksize", "256B"); + testConfig.set(prefix + "replication", "3"); + LogSorter sorter = new LogSorter(context, testConfig); + + final String workdir = tempFolder.newFolder().getAbsolutePath(); + + try (var vm = VolumeManagerImpl.getLocalForTesting(workdir)) { + expect(context.getVolumeManager()).andReturn(vm).anyTimes(); + expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance()) + .anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + replay(context); + final Path workdirPath = new Path("file://" + workdir); + vm.deleteRecursively(workdirPath); + + KeyValue[] events = + {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent), + createKeyValue(COMPACTION_FINISH, 2, 1, null), + createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"), + createKeyValue(COMPACTION_FINISH, 5, 1, null), + createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 5, 1, m)}; + String dest = workdir + "/testLogSortedProperties"; + + List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); + int parts = 0; + for (KeyValue pair : events) { + buffer.add(new Pair<>(pair.key, pair.value)); + if (buffer.size() >= bufferSize) { + sorter.writeBuffer(dest, buffer, parts++); + buffer.clear(); + } + } + sorter.writeBuffer(dest, buffer, parts); + FileSystem fs = vm.getFileSystemByPath(workdirPath); + + // check contents of directory + for (var file : fs.listStatus(new Path(dest))) { + assertTrue(file.isFile()); + try (var fileStream = fs.open(file.getPath())) { + var algo = getCompressionFromRFile(fileStream, file.getLen()); + assertEquals(sortFileCompression, algo.getName()); + } + } + } + } + + /** + * Pulled from BCFile.Reader() + */ + private final Utils.Version API_VERSION_3 = new Utils.Version((short) 3, (short) 0); + private final String defaultPrefix = "data:"; + + private Compression.Algorithm getCompressionFromRFile(FSDataInputStream fsin, long fileLength) + throws IOException { + try (var in = new SeekableDataInputStream(fsin)) { + int magicNumberSize = 16; // BCFile.Magic.size(); + // Move the cursor to grab the version and the magic first + in.seek(fileLength - magicNumberSize - Utils.Version.size()); + var version = new Utils.Version(in); + assertEquals(API_VERSION_3, version); + in.readFully(new byte[16]); // BCFile.Magic.readAndVerify(in); // 16 bytes + in.seek(fileLength - magicNumberSize - Utils.Version.size() - 16); // 2 * Long.BYTES = 16 + long offsetIndexMeta = in.readLong(); + long offsetCryptoParameters = in.readLong(); + assertTrue(offsetCryptoParameters > 0); + + // read meta index + in.seek(offsetIndexMeta); + int count = Utils.readVInt(in); + assertTrue(count > 0); + + String fullMetaName = Utils.readString(in); + if (fullMetaName != null && !fullMetaName.startsWith(defaultPrefix)) { + throw new IOException("Corrupted Meta region Index"); + } + return Compression.getCompressionAlgorithmByName(Utils.readString(in)); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java index e66b380..4f91ec6 100644 --- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java @@ -57,6 +57,8 @@ public class MultiTableRecoveryIT extends ConfigurableMacBase { // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + // test sorted rfile recovery options + cfg.setProperty(Property.TSERV_WAL_SORT_FILE_PREFIX + "compress.type", "none"); } @Override