This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-4298
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 08dda9ade16b0befb2e643d64906337cd2fa7e34
Author: Elek Márton <[email protected]>
AuthorDate: Mon Oct 5 12:30:11 2020 +0200

    move out unsafeByteBufferConversion from the new interface
---
 .../apache/hadoop/hdds/scm/XceiverClientFactory.java   |  6 ------
 .../apache/hadoop/hdds/scm/XceiverClientManager.java   |  7 -------
 .../org/apache/hadoop/hdds/scm/storage/BufferPool.java |  2 +-
 .../apache/hadoop/hdds/scm/ByteStringConversion.java   | 18 +++++++-----------
 .../ozone/container/keyvalue/KeyValueHandler.java      | 14 +++++++++++---
 .../ozone/client/io/BlockOutputStreamEntryPool.java    | 11 ++++++++---
 .../apache/hadoop/ozone/client/io/KeyOutputStream.java | 14 +++++++++++---
 .../org/apache/hadoop/ozone/client/rpc/RpcClient.java  |  6 ++++++
 8 files changed, 44 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java
index 184645d..dc35cd5 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java
@@ -18,20 +18,14 @@
 package org.apache.hadoop.hdds.scm;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.function.Function;
 
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
 /**
  * Interface to provide XceiverClient when needed.
  */
 public interface XceiverClientFactory {
 
-  Function<ByteBuffer, ByteString> byteBufferToByteStringConversion();
-
   XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException;
 
   void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index e07a5d2..eaf0503 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -20,12 +20,10 @@ package org.apache.hadoop.hdds.scm;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -49,7 +47,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
 import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -277,10 +274,6 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
     }
   }
 
-  public Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(){
-    return ByteStringConversion.createByteBufferConversion(conf);
-  }
-
   /**
    * Get xceiver client metric.
    */
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index dc27d4b..94fa87a 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -42,7 +42,7 @@ public class BufferPool {
 
   public BufferPool(int bufferSize, int capacity) {
     this(bufferSize, capacity,
-        ByteStringConversion.createByteBufferConversion(null));
+        ByteStringConversion.createByteBufferConversion(false));
   }
 
   public BufferPool(int bufferSize, int capacity,
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java
index dc44392..b5f6e48 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hdds.scm;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import java.nio.ByteBuffer;
+import java.util.function.Function;
+
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
-import java.nio.ByteBuffer;
-import java.util.function.Function;
-
 /**
  * Helper class to create a conversion function from ByteBuffer to ByteString
  * based on the property
@@ -38,17 +38,13 @@ public final class ByteStringConversion {
    * Creates the conversion function to be used to convert ByteBuffers to
    * ByteString instances to be used in protobuf messages.
    *
-   * @param config the Ozone configuration
    * @return the conversion function defined by
-   *          {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED}
+   * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED}
    * @see ByteBuffer
    */
   public static Function<ByteBuffer, ByteString> createByteBufferConversion(
-      ConfigurationSource config){
-    boolean unsafeEnabled =
-        config!=null && config.getBoolean(
-            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
-            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
+      boolean unsafeEnabled
+  ) {
     if (unsafeEnabled) {
       return UnsafeByteOperations::unsafeWrap;
     } else {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index e0de6ff..70f4ffc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -127,14 +128,21 @@ public class KeyValueHandler extends Handler {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    maxContainerSize = (long)config.getStorageSize(
+    maxContainerSize = (long) config.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-            ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
     // this handler lock is used for synchronizing createContainer Requests,
     // so using a fair lock here.
     containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
+
+    boolean isUnsafeByteBufferConversionEnabled =
+        conf.getBoolean(
+            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
+            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
+
     byteBufferToByteString =
-        ByteStringConversion.createByteBufferConversion(conf);
+        ByteStringConversion
+            .createByteBufferConversion(isUnsafeByteBufferConversionEnabled);
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index c8effd8..71784c5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -25,6 +25,7 @@ import java.util.ListIterator;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -73,7 +74,8 @@ public class BlockOutputStreamEntryPool {
   private final ExcludeList excludeList;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
-  public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
+  public BlockOutputStreamEntryPool(
+      OzoneManagerProtocol omClient,
       int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
       HddsProtos.ReplicationType type,
       int bufferSize, long bufferFlushSize,
@@ -81,7 +83,9 @@ public class BlockOutputStreamEntryPool {
       long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
       int bytesPerChecksum, String uploadID, int partNumber,
       boolean isMultipart, OmKeyInfo info,
-      XceiverClientFactory xceiverClientFactory, long openID) {
+      boolean unsafeByteBufferConversion,
+      XceiverClientFactory xceiverClientFactory, long openID
+  ) {
     streamEntries = new ArrayList<>();
     currentStreamIndex = 0;
     this.omClient = omClient;
@@ -122,7 +126,8 @@ public class BlockOutputStreamEntryPool {
     this.bufferPool =
         new BufferPool(streamBufferSize,
             (int) (streamBufferMaxSize / streamBufferSize),
-            xceiverClientFactory.byteBufferToByteStringConversion());
+            ByteStringConversion
+                .createByteBufferConversion(unsafeByteBufferConversion));
   }
 
   /**
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c65c264..03cdb72 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -134,14 +134,16 @@ public class KeyOutputStream extends OutputStream {
       long bufferMaxSize, long size, long watchTimeout,
       ChecksumType checksumType, int bytesPerChecksum,
       String uploadID, int partNumber, boolean isMultipart,
-      int maxRetryCount, long retryInterval) {
+      int maxRetryCount, long retryInterval,
+      boolean unsafeByteBufferConversion) {
     OmKeyInfo info = handler.getKeyInfo();
     blockOutputStreamEntryPool =
         new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
             type, bufferSize, bufferFlushSize, isBufferFlushDelay,
             bufferMaxSize, size,
             watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber,
-            isMultipart, info, xceiverClientManager, handler.getId());
+            isMultipart, info, unsafeByteBufferConversion,
+            xceiverClientManager, handler.getId());
     // Retrieve the file encryption key info, null if file is not in
     // encrypted bucket.
     this.feInfo = info.getFileEncryptionInfo();
@@ -560,6 +562,7 @@ public class KeyOutputStream extends OutputStream {
     private boolean isMultipartKey;
     private int maxRetryCount;
     private long retryInterval;
+    private boolean unsafeByteBufferConversion;
 
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
@@ -656,6 +659,11 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+      this.unsafeByteBufferConversion = enabled;
+      return this;
+    }
+
     public KeyOutputStream build() {
       return new KeyOutputStream(openHandler, xceiverManager, omClient,
           chunkSize, requestID, factor, type,
@@ -663,7 +671,7 @@ public class KeyOutputStream extends OutputStream {
           streamBufferMaxSize,
           blockSize, watchTimeout, checksumType,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
-          maxRetryCount, retryInterval);
+          maxRetryCount, retryInterval, unsafeByteBufferConversion);
     }
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 94fbe52..c61d0eb 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -132,6 +132,7 @@ public class RpcClient implements ClientProtocol {
   private final int chunkSize;
   private final ChecksumType checksumType;
   private final int bytesPerChecksum;
+  private final boolean unsafeByteBufferConversion;
   private boolean verifyChecksum;
   private final UserGroupInformation ugi;
   private final ACLType userRights;
@@ -211,6 +212,10 @@ public class RpcClient implements ClientProtocol {
     blockSize = (long) 
conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
         OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
 
+    unsafeByteBufferConversion = conf.getBoolean(
+            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
+            OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
+        
     int configuredChecksumSize = (int) conf.getStorageSize(
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
@@ -1279,6 +1284,7 @@ public class RpcClient implements ClientProtocol {
             .setBytesPerChecksum(bytesPerChecksum)
             .setMaxRetryCount(maxRetryCount)
             .setRetryInterval(retryInterval)
+            .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
             .build();
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to