This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ea979d  Add new property to configure rfile sorted recovery (#2236)
3ea979d is described below

commit 3ea979d5971016b9c2512a96cc5b6bd5d1969b2e
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Fri Aug 20 10:44:08 2021 -0400

    Add new property to configure rfile sorted recovery (#2236)
    
    * Create new property prefix "tserver.wal.sort.file." to configure the
    rfiles written during sorted recovery
    * Add method to LogSorter to convert the sort file properties to table
    files properties
    * Create new tests in SortedLogRecoveryTest
    * Make method public in Compression to use in test
    * Add property to MultiTableRecoveryIT to allow testing in an IT
---
 .../org/apache/accumulo/core/conf/Property.java    |   5 +
 .../core/file/rfile/bcfile/Compression.java        |   2 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  33 ++++--
 .../tserver/log/SortedLogRecoveryTest.java         | 112 +++++++++++++++++++++
 .../apache/accumulo/test/MultiTableRecoveryIT.java |   2 +
 5 files changed, 147 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f97e802..e41f898 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -675,6 +675,11 @@ public enum Property {
       "The maximum number of threads to use to sort logs during" + " 
recovery", "1.5.0"),
   TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "10%", 
PropertyType.MEMORY,
       "The amount of memory to use when sorting logs during recovery.", 
"1.5.0"),
+  TSERV_WAL_SORT_FILE_PREFIX("tserver.wal.sort.file.", null, 
PropertyType.PREFIX,
+      "The rfile properties to use when sorting logs during recovery. Most of 
the properties"
+          + " that begin with 'table.file' can be used here. For example, to 
set the compression"
+          + " of the sorted recovery files to snappy use 
'tserver.wal.sort.file.compress.type=snappy'",
+      "2.1.0"),
   TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
       "The number of threads for the distributed work queue. These threads are"
           + " used for copying failed bulk import RFiles.",
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index dcd41a6..d4089df 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -696,7 +696,7 @@ public final class Compression {
     return supportedAlgorithms.toArray(new String[0]);
   }
 
-  static Algorithm getCompressionAlgorithmByName(final String name) {
+  public static Algorithm getCompressionAlgorithmByName(final String name) {
     Algorithm[] algorithms = Algorithm.class.getEnumConstants();
     for (Algorithm algorithm : algorithms) {
       if (algorithm.getName().equals(name)) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index bd299af..3d593dc 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -59,7 +60,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class LogSorter {
 
   private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
-  AccumuloConfiguration conf;
+  AccumuloConfiguration sortedLogConf;
 
   private final Map<String,LogProcessor> currentWork = 
Collections.synchronizedMap(new HashMap<>());
 
@@ -139,7 +140,7 @@ public class LogSorter {
 
       input = fs.open(srcPath);
       try {
-        decryptingInput = DfsLogger.getDecryptingStream(input, conf);
+        decryptingInput = DfsLogger.getDecryptingStream(input, sortedLogConf);
       } catch (LogHeaderIncompleteException e) {
         log.warn("Could not read header from write-ahead log {}. Not 
sorting.", srcPath);
         // Creating a 'finished' marker will cause recovery to proceed 
normally and the
@@ -150,7 +151,7 @@ public class LogSorter {
         return;
       }
 
-      final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+      final long bufferSize = 
sortedLogConf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
       Thread.currentThread().setName("Sorting " + name + " for recovery");
       while (true) {
         final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
@@ -206,13 +207,33 @@ public class LogSorter {
 
   public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.context = context;
-    this.conf = conf;
+    this.sortedLogConf = extractSortedLogConfig(conf);
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool =
         ThreadPools.createFixedThreadPool(threadPoolSize, 
this.getClass().getName(), false);
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
+  /**
+   * Get the properties set with {@link Property#TSERV_WAL_SORT_FILE_PREFIX} 
and translate them to
+   * equivalent 'table.file' properties to be used when writing rfiles for 
sorted recovery.
+   */
+  private AccumuloConfiguration extractSortedLogConfig(AccumuloConfiguration 
conf) {
+    final String tablePrefix = "table.file.";
+    var props = 
conf.getAllPropertiesWithPrefixStripped(Property.TSERV_WAL_SORT_FILE_PREFIX);
+    ConfigurationCopy copy = new ConfigurationCopy(conf);
+    props.forEach((prop, val) -> {
+      String tableProp = tablePrefix + prop;
+      if (Property.isTablePropertyValid(tableProp, val)) {
+        log.debug("Using property for writing sorted files: {}={}", tableProp, 
val);
+        copy.set(tableProp, val);
+      } else {
+        throw new IllegalArgumentException("Invalid sort file property " + 
prop + "=" + val);
+      }
+    });
+    return copy;
+  }
+
   @VisibleForTesting
   void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> 
buffer, int part)
       throws IOException {
@@ -237,7 +258,7 @@ public class LogSorter {
 
     try (var writer = FileOperations.getInstance().newWriterBuilder()
         .forFile(fullPath.toString(), fs, fs.getConf(), 
context.getCryptoService())
-        .withTableConfiguration(conf).build()) {
+        .withTableConfiguration(sortedLogConf).build()) {
       writer.startDefaultLocalityGroup();
       for (var entry : keyListMap.entrySet()) {
         LogFileValue val = new LogFileValue();
@@ -250,7 +271,7 @@ public class LogSorter {
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor 
distWorkQThreadPool)
       throws KeeperException, InterruptedException {
     this.threadPool = distWorkQThreadPool;
-    new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, 
conf)
+    new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, 
sortedLogConf)
         .startProcessing(new LogProcessor(), this.threadPool);
   }
 
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 582b9bc..0a2720b 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -45,12 +45,17 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils;
+import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.data.ServerMutation;
@@ -59,6 +64,7 @@ import org.apache.accumulo.server.log.SortedLogState;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -1064,4 +1070,110 @@ public class SortedLogRecoveryTest {
     var e = assertThrows(IllegalStateException.class, () -> recover(logs, 
extent));
     assertTrue(e.getMessage().contains("not " + LogEvents.OPEN));
   }
+
+  @Test
+  public void testInvalidLogSortedProperties() {
+    ConfigurationCopy testConfig = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+    // test all the possible properties for tserver.sort.file. prefix
+    String prop = Property.TSERV_WAL_SORT_FILE_PREFIX + "invalid";
+    testConfig.set(prop, "snappy");
+    try {
+      new LogSorter(context, testConfig);
+      fail("Did not throw IllegalArgumentException for " + prop);
+    } catch (IllegalArgumentException e) {
+      // valid for test
+    }
+  }
+
+  @Test
+  public void testLogSortedProperties() throws Exception {
+    Mutation ignored = new ServerMutation(new Text("ignored"));
+    ignored.put(cf, cq, value);
+    Mutation m = new ServerMutation(new Text("row1"));
+    m.put(cf, cq, value);
+    ConfigurationCopy testConfig = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+    String sortFileCompression = "none";
+    // test all the possible properties for tserver.sort.file. prefix
+    String prefix = Property.TSERV_WAL_SORT_FILE_PREFIX.toString();
+    testConfig.set(prefix + "compress.type", sortFileCompression);
+    testConfig.set(prefix + "compress.blocksize", "50K");
+    testConfig.set(prefix + "compress.blocksize.index", "56K");
+    testConfig.set(prefix + "blocksize", "256B");
+    testConfig.set(prefix + "replication", "3");
+    LogSorter sorter = new LogSorter(context, testConfig);
+
+    final String workdir = tempFolder.newFolder().getAbsolutePath();
+
+    try (var vm = VolumeManagerImpl.getLocalForTesting(workdir)) {
+      expect(context.getVolumeManager()).andReturn(vm).anyTimes();
+      
expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+          .anyTimes();
+      
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+      replay(context);
+      final Path workdirPath = new Path("file://" + workdir);
+      vm.deleteRecursively(workdirPath);
+
+      KeyValue[] events =
+          {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 
1, extent),
+              createKeyValue(COMPACTION_FINISH, 2, 1, null),
+              createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"),
+              createKeyValue(COMPACTION_FINISH, 5, 1, null),
+              createKeyValue(MUTATION, 3, 1, ignored), 
createKeyValue(MUTATION, 5, 1, m)};
+      String dest = workdir + "/testLogSortedProperties";
+
+      List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+      int parts = 0;
+      for (KeyValue pair : events) {
+        buffer.add(new Pair<>(pair.key, pair.value));
+        if (buffer.size() >= bufferSize) {
+          sorter.writeBuffer(dest, buffer, parts++);
+          buffer.clear();
+        }
+      }
+      sorter.writeBuffer(dest, buffer, parts);
+      FileSystem fs = vm.getFileSystemByPath(workdirPath);
+
+      // check contents of directory
+      for (var file : fs.listStatus(new Path(dest))) {
+        assertTrue(file.isFile());
+        try (var fileStream = fs.open(file.getPath())) {
+          var algo = getCompressionFromRFile(fileStream, file.getLen());
+          assertEquals(sortFileCompression, algo.getName());
+        }
+      }
+    }
+  }
+
+  /**
+   * Pulled from BCFile.Reader()
+   */
+  private final Utils.Version API_VERSION_3 = new Utils.Version((short) 3, 
(short) 0);
+  private final String defaultPrefix = "data:";
+
+  private Compression.Algorithm getCompressionFromRFile(FSDataInputStream 
fsin, long fileLength)
+      throws IOException {
+    try (var in = new SeekableDataInputStream(fsin)) {
+      int magicNumberSize = 16; // BCFile.Magic.size();
+      // Move the cursor to grab the version and the magic first
+      in.seek(fileLength - magicNumberSize - Utils.Version.size());
+      var version = new Utils.Version(in);
+      assertEquals(API_VERSION_3, version);
+      in.readFully(new byte[16]); // BCFile.Magic.readAndVerify(in); // 16 
bytes
+      in.seek(fileLength - magicNumberSize - Utils.Version.size() - 16); // 2 
* Long.BYTES = 16
+      long offsetIndexMeta = in.readLong();
+      long offsetCryptoParameters = in.readLong();
+      assertTrue(offsetCryptoParameters > 0);
+
+      // read meta index
+      in.seek(offsetIndexMeta);
+      int count = Utils.readVInt(in);
+      assertTrue(count > 0);
+
+      String fullMetaName = Utils.readString(in);
+      if (fullMetaName != null && !fullMetaName.startsWith(defaultPrefix)) {
+        throw new IOException("Corrupted Meta region Index");
+      }
+      return Compression.getCompressionAlgorithmByName(Utils.readString(in));
+    }
+  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java 
b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
index e66b380..4f91ec6 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
@@ -57,6 +57,8 @@ public class MultiTableRecoveryIT extends ConfigurableMacBase 
{
 
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    // test sorted rfile recovery options
+    cfg.setProperty(Property.TSERV_WAL_SORT_FILE_PREFIX + "compress.type", 
"none");
   }
 
   @Override

Reply via email to