HBASE-15538 Implement secure async protobuf wal writer

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d6fd8594
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d6fd8594
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d6fd8594

Branch: refs/heads/hbase-12439
Commit: d6fd85945130516ba10fe4854ce080e5a2329983
Parents: 9d56105
Author: zhangduo <zhang...@apache.org>
Authored: Tue Mar 29 23:02:41 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Mar 31 10:33:35 2016 +0800

----------------------------------------------------------------------
 .../wal/AbstractProtobufLogWriter.java          | 89 +++++++++++++++++++-
 .../regionserver/wal/ProtobufLogReader.java     |  1 +
 .../wal/SecureAsyncProtobufLogWriter.java       | 54 ++++++++++++
 .../wal/SecureProtobufLogReader.java            |  2 +
 .../wal/SecureProtobufLogWriter.java            | 64 ++------------
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    | 27 ++++--
 .../regionserver/wal/InstrumentedLogWriter.java |  5 ++
 .../wal/TestSecureAsyncWALReplay.java           | 45 ++++++++++
 .../apache/hadoop/hbase/wal/IOTestProvider.java |  5 ++
 .../apache/hadoop/hbase/wal/TestSecureWAL.java  | 50 +++++++++--
 10 files changed, 266 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 66f1f54..66fb672 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -22,8 +22,12 @@ import static 
org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRA
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.security.Key;
+import java.security.SecureRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.crypto.spec.SecretKeySpec;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,9 +37,16 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.crypto.Encryptor;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.EncryptionTest;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
@@ -63,10 +74,9 @@ public abstract class AbstractProtobufLogWriter {
     return WALCellCodec.create(conf, null, compressionContext);
   }
 
-  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder 
builder)
-      throws IOException {
+  private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder 
builder) {
     if (!builder.hasWriterClsName()) {
-      builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName());
+      builder.setWriterClsName(getWriterClassName());
     }
     if (!builder.hasCellCodecClsName()) {
       builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
@@ -74,6 +84,60 @@ public abstract class AbstractProtobufLogWriter {
     return builder.build();
   }
 
+  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder 
builder)
+      throws IOException {
+    return buildWALHeader0(conf, builder);
+  }
+
+  // should be called in sub classes's buildWALHeader method to build 
WALHeader for secure
+  // environment. Do not forget to override the setEncryptor method as it will 
be called in this
+  // method to init your encryptor.
+  protected final WALHeader buildSecureWALHeader(Configuration conf, 
WALHeader.Builder builder)
+      throws IOException {
+    builder.setWriterClsName(getWriterClassName());
+    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
+      EncryptionTest.testKeyProvider(conf);
+      EncryptionTest.testCipherProvider(conf);
+
+      // Get an instance of our cipher
+      final String cipherName =
+          conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES);
+      Cipher cipher = Encryption.getCipher(conf, cipherName);
+      if (cipher == null) {
+        throw new RuntimeException("Cipher '" + cipherName + "' is not 
available");
+      }
+
+      // Generate an encryption key for this WAL
+      SecureRandom rng = new SecureRandom();
+      byte[] keyBytes = new byte[cipher.getKeyLength()];
+      rng.nextBytes(keyBytes);
+      Key key = new SecretKeySpec(keyBytes, cipher.getName());
+      builder.setEncryptionKey(ByteStringer.wrap(EncryptionUtil.wrapKey(conf,
+          conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
+              conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
+                  User.getCurrent().getShortName())),
+          key)));
+
+      // Set up the encryptor
+      Encryptor encryptor = cipher.getEncryptor();
+      encryptor.setKey(key);
+      setEncryptor(encryptor);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Initialized secure protobuf WAL: cipher=" + 
cipher.getName());
+      }
+    }
+    builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
+    return buildWALHeader0(conf, builder);
+  }
+
+  // override this if you need a encryptor
+  protected void setEncryptor(Encryptor encryptor) {
+  }
+
+  protected String getWriterClassName() {
+    return getClass().getSimpleName();
+  }
+
   private boolean initializeCompressionContext(Configuration conf, Path path) 
throws IOException {
     boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, 
false);
     if (doCompress) {
@@ -115,7 +179,7 @@ public abstract class AbstractProtobufLogWriter {
     }
   }
 
-  protected void initAfterHeader(boolean doCompress) throws IOException {
+  private void initAfterHeader0(boolean doCompress) throws IOException {
     WALCellCodec codec = getCodec(conf, this.compressionContext);
     this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
     if (doCompress) {
@@ -123,6 +187,23 @@ public abstract class AbstractProtobufLogWriter {
     }
   }
 
+  protected void initAfterHeader(boolean doCompress) throws IOException {
+    initAfterHeader0(doCompress);
+  }
+
+  // should be called in sub classes's initAfterHeader method to init 
SecureWALCellCodec.
+  protected final void secureInitAfterHeader(boolean doCompress, Encryptor 
encryptor)
+      throws IOException {
+    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor 
!= null) {
+      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
+      this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
+      // We do not support compression
+      this.compressionContext = null;
+    } else {
+      initAfterHeader0(doCompress);
+    }
+  }
+
   void setWALTrailer(WALTrailer walTrailer) {
     this.trailer = walTrailer;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index bb25aa1..0755358 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -91,6 +91,7 @@ public class ProtobufLogReader extends ReaderBase {
   private static List<String> writerClsNames = new ArrayList<String>();
   static {
     writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
+    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
   }
   
   // cell codec classname

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
new file mode 100644
index 0000000..f3cc41a
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.crypto.Encryptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
+
+import io.netty.channel.EventLoop;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
+
+  private Encryptor encryptor = null;
+
+  public SecureAsyncProtobufLogWriter(EventLoop eventLoop) {
+    super(eventLoop);
+  }
+
+  @Override
+  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder 
builder)
+      throws IOException {
+    return super.buildSecureWALHeader(conf, builder);
+  }
+
+  @Override
+  protected void setEncryptor(Encryptor encryptor) {
+    this.encryptor = encryptor;
+  }
+
+  @Override
+  protected void initAfterHeader(boolean doCompress) throws IOException {
+    super.secureInitAfterHeader(doCompress, encryptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
index 0d052d4..9af5a0d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
@@ -48,6 +48,8 @@ public class SecureProtobufLogReader extends 
ProtobufLogReader {
   static {
     writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
     writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
+    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
+    writerClsNames.add(SecureAsyncProtobufLogWriter.class.getSimpleName());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
index c352770..6d1283e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
@@ -19,81 +19,31 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.security.Key;
-import java.security.SecureRandom;
-
-import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.EncryptionTest;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.crypto.Encryptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class SecureProtobufLogWriter extends ProtobufLogWriter {
 
-  private static final Log LOG = 
LogFactory.getLog(SecureProtobufLogWriter.class);
   private Encryptor encryptor = null;
 
   @Override
   protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder 
builder)
       throws IOException {
-    builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
-    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
-      EncryptionTest.testKeyProvider(conf);
-      EncryptionTest.testCipherProvider(conf);
-
-      // Get an instance of our cipher
-      final String cipherName =
-          conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, 
HConstants.CIPHER_AES);
-      Cipher cipher = Encryption.getCipher(conf, cipherName);
-      if (cipher == null) {
-        throw new RuntimeException("Cipher '" + cipherName + "' is not 
available");
-      }
-
-      // Generate an encryption key for this WAL
-      SecureRandom rng = new SecureRandom();
-      byte[] keyBytes = new byte[cipher.getKeyLength()];
-      rng.nextBytes(keyBytes);
-      Key key = new SecretKeySpec(keyBytes, cipher.getName());
-      builder.setEncryptionKey(ByteStringer.wrap(EncryptionUtil.wrapKey(conf,
-          conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
-              conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
-                  User.getCurrent().getShortName())),
-          key)));
-
-      // Set up the encryptor
-      encryptor = cipher.getEncryptor();
-      encryptor.setKey(key);
+    return super.buildSecureWALHeader(conf, builder);
+  }
 
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Initialized secure protobuf WAL: cipher=" + 
cipher.getName());
-      }
-    }
-    builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
-    return super.buildWALHeader(conf, builder);
+  @Override
+  protected void setEncryptor(Encryptor encryptor) {
+    this.encryptor = encryptor;
   }
 
   @Override
   protected void initAfterHeader(boolean doCompress) throws IOException {
-    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor 
!= null) {
-      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
-      this.cellEncoder = codec.getEncoder(this.output);
-      // We do not support compression
-      this.compressionContext = null;
-    } else {
-      super.initAfterHeader(doCompress);
-    }
+    super.secureInitAfterHeader(doCompress, encryptor);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index bc142ce..38002fb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -17,8 +17,14 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,10 +36,6 @@ import 
org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
-import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-
 /**
  * A WAL provider that use {@link AsyncFSWAL}.
  */
@@ -41,6 +43,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
 @InterfaceStability.Evolving
 public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
 
+  private static final Log LOG = LogFactory.getLog(AsyncFSWALProvider.class);
+
   // Only public so classes back in regionserver.wal can access
   public interface AsyncWriter extends WALProvider.AsyncWriter {
     void init(FileSystem fs, Path path, Configuration c, boolean overwritable) 
throws IOException;
@@ -66,8 +70,17 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
    */
   public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem 
fs, Path path,
       boolean overwritable, EventLoop eventLoop) throws IOException {
-    AsyncWriter writer = new AsyncProtobufLogWriter(eventLoop);
-    writer.init(fs, path, conf, overwritable);
-    return writer;
+    // Configuration already does caching for the Class lookup.
+    Class<? extends AsyncWriter> logWriterClass =
+        conf.getClass("hbase.regionserver.hlog.async.writer.impl", 
AsyncProtobufLogWriter.class,
+          AsyncWriter.class);
+    try {
+      AsyncWriter writer = 
logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
+      writer.init(fs, path, conf, overwritable);
+      return writer;
+    } catch (Exception e) {
+      LOG.debug("Error instantiating log writer.", e);
+      throw new IOException("cannot get log writer", e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
index d7a4618..2aebf2b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
@@ -29,6 +29,11 @@ public class InstrumentedLogWriter extends ProtobufLogWriter 
{
     super();
   }
 
+  @Override
+  protected String getWriterClassName() {
+    return ProtobufLogWriter.class.getSimpleName();
+  }
+
   public static boolean activateFailure = false;
   @Override
     public void append(Entry entry) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
new file mode 100644
index 0000000..73c0216
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
+    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, 
KeyProviderForTesting.class.getName());
+    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+    conf.setClass("hbase.regionserver.hlog.reader.impl", 
SecureProtobufLogReader.class,
+      Reader.class);
+    conf.setClass("hbase.regionserver.hlog.async.writer.impl", 
SecureAsyncProtobufLogWriter.class,
+      AsyncWriter.class);
+    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
+    TestAsyncWALReplay.setUpBeforeClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 6b1ca03..7664bfa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -217,6 +217,11 @@ public class IOTestProvider implements WALProvider {
     }
 
     @Override
+    protected String getWriterClassName() {
+      return ProtobufLogWriter.class.getSimpleName();
+    }
+
+    @Override
     public void append(Entry entry) throws IOException {
       if (doAppends) {
         super.append(entry);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d6fd8594/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index caa0a45..984a560 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -21,12 +21,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -50,19 +51,39 @@ import 
org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-@Category({RegionServerTests.class, MediumTests.class})
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, MediumTests.class })
 public class TestSecureWAL {
-  private static final Log LOG = LogFactory.getLog(TestSecureWAL.class);
+
   static {
     
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
       .getLogger().setLevel(Level.ALL);
   };
   static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
+  @Rule
+  public TestName name = new TestName();
+
+  @Parameter
+  public String walProvider;
+
+  @Parameters(name = "{index}: provider={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { 
"asyncfs" });
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
@@ -72,13 +93,26 @@ public class TestSecureWAL {
       WAL.Reader.class);
     conf.setClass("hbase.regionserver.hlog.writer.impl", 
SecureProtobufLogWriter.class,
       WALProvider.Writer.class);
+    conf.setClass("hbase.regionserver.hlog.async.writer.impl", 
SecureAsyncProtobufLogWriter.class,
+      WALProvider.AsyncWriter.class);
     conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
-    FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
+    FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() {
+    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
   }
 
   @Test
   public void testSecureWAL() throws Exception {
-    TableName tableName = TableName.valueOf("TestSecureWAL");
+    TableName tableName = 
TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.addFamily(new HColumnDescriptor(tableName.getName()));
     NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
@@ -92,8 +126,9 @@ public class TestSecureWAL {
     final byte[] row = Bytes.toBytes("row");
     final byte[] family = Bytes.toBytes("family");
     final byte[] value = Bytes.toBytes("Test value");
-    FileSystem fs = TEST_UTIL.getTestFileSystem();
-    final WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, 
"TestSecureWAL");
+    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    final WALFactory wals =
+        new WALFactory(TEST_UTIL.getConfiguration(), null, 
tableName.getNameAsString());
 
     // Write the WAL
     final WAL wal =
@@ -137,5 +172,4 @@ public class TestSecureWAL {
     assertEquals("Should have read back as many KVs as written", total, count);
     reader.close();
   }
-
 }

Reply via email to