This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-4298 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 08dda9ade16b0befb2e643d64906337cd2fa7e34 Author: Elek Márton <[email protected]> AuthorDate: Mon Oct 5 12:30:11 2020 +0200 move out unsafeByteBufferConversion from the new interface --- .../apache/hadoop/hdds/scm/XceiverClientFactory.java | 6 ------ .../apache/hadoop/hdds/scm/XceiverClientManager.java | 7 ------- .../org/apache/hadoop/hdds/scm/storage/BufferPool.java | 2 +- .../apache/hadoop/hdds/scm/ByteStringConversion.java | 18 +++++++----------- .../ozone/container/keyvalue/KeyValueHandler.java | 14 +++++++++++--- .../ozone/client/io/BlockOutputStreamEntryPool.java | 11 ++++++++--- .../apache/hadoop/ozone/client/io/KeyOutputStream.java | 14 +++++++++++--- .../org/apache/hadoop/ozone/client/rpc/RpcClient.java | 6 ++++++ 8 files changed, 44 insertions(+), 34 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java index 184645d..dc35cd5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientFactory.java @@ -18,20 +18,14 @@ package org.apache.hadoop.hdds.scm; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.function.Function; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; - /** * Interface to provide XceiverClient when needed. */ public interface XceiverClientFactory { - Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(); - XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException; void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index e07a5d2..eaf0503 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -20,12 +20,10 @@ package org.apache.hadoop.hdds.scm; import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; @@ -49,7 +47,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE; import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,10 +274,6 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory { } } - public Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(){ - return ByteStringConversion.createByteBufferConversion(conf); - } - /** * Get xceiver client metric. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index dc27d4b..94fa87a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -42,7 +42,7 @@ public class BufferPool { public BufferPool(int bufferSize, int capacity) { this(bufferSize, capacity, - ByteStringConversion.createByteBufferConversion(null)); + ByteStringConversion.createByteBufferConversion(false)); } public BufferPool(int bufferSize, int capacity, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java index dc44392..b5f6e48 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hdds.scm; -import org.apache.hadoop.hdds.conf.ConfigurationSource; +import java.nio.ByteBuffer; +import java.util.function.Function; + import org.apache.hadoop.ozone.OzoneConfigKeys; + import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; -import java.nio.ByteBuffer; -import java.util.function.Function; - /** * Helper class to create a conversion function from ByteBuffer to ByteString * based on the property @@ -38,17 +38,13 @@ public final class ByteStringConversion { * Creates the conversion function to be used to convert ByteBuffers to * ByteString instances to be used in protobuf messages. * - * @param config the Ozone configuration * @return the conversion function defined by - * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} + * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} * @see ByteBuffer */ public static Function<ByteBuffer, ByteString> createByteBufferConversion( - ConfigurationSource config){ - boolean unsafeEnabled = - config!=null && config.getBoolean( - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + boolean unsafeEnabled + ) { if (unsafeEnabled) { return UnsafeByteOperations::unsafeWrap; } else { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e0de6ff..70f4ffc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; @@ -127,14 +128,21 @@ public class KeyValueHandler extends Handler { } catch (Exception e) { throw new RuntimeException(e); } - maxContainerSize = (long)config.getStorageSize( + maxContainerSize = (long) config.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); + + boolean isUnsafeByteBufferConversionEnabled = + conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + byteBufferToByteString = - ByteStringConversion.createByteBufferConversion(conf); + ByteStringConversion + .createByteBufferConversion(isUnsafeByteBufferConversionEnabled); } @VisibleForTesting diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index c8effd8..71784c5 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -25,6 +25,7 @@ import java.util.ListIterator; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -73,7 +74,8 @@ public class BlockOutputStreamEntryPool { private final ExcludeList excludeList; @SuppressWarnings({"parameternumber", "squid:S00107"}) - public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, + public BlockOutputStreamEntryPool( + OzoneManagerProtocol omClient, int chunkSize, String requestId, HddsProtos.ReplicationFactor factor, HddsProtos.ReplicationType type, int bufferSize, long bufferFlushSize, @@ -81,7 +83,9 @@ public class BlockOutputStreamEntryPool { long size, long watchTimeout, ContainerProtos.ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, OmKeyInfo info, - XceiverClientFactory xceiverClientFactory, long openID) { + boolean unsafeByteBufferConversion, + XceiverClientFactory xceiverClientFactory, long openID + ) { streamEntries = new ArrayList<>(); currentStreamIndex = 0; this.omClient = omClient; @@ -122,7 +126,8 @@ public class BlockOutputStreamEntryPool { this.bufferPool = new BufferPool(streamBufferSize, (int) (streamBufferMaxSize / streamBufferSize), - xceiverClientFactory.byteBufferToByteStringConversion()); + ByteStringConversion + .createByteBufferConversion(unsafeByteBufferConversion)); } /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index c65c264..03cdb72 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -134,14 +134,16 @@ public class KeyOutputStream extends OutputStream { long bufferMaxSize, long size, long watchTimeout, ChecksumType checksumType, int bytesPerChecksum, String uploadID, int partNumber, boolean isMultipart, - int maxRetryCount, long retryInterval) { + int maxRetryCount, long retryInterval, + boolean unsafeByteBufferConversion) { OmKeyInfo info = handler.getKeyInfo(); blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor, type, bufferSize, bufferFlushSize, isBufferFlushDelay, bufferMaxSize, size, watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber, - isMultipart, info, xceiverClientManager, handler.getId()); + isMultipart, info, unsafeByteBufferConversion, + xceiverClientManager, handler.getId()); // Retrieve the file encryption key info, null if file is not in // encrypted bucket. this.feInfo = info.getFileEncryptionInfo(); @@ -560,6 +562,7 @@ public class KeyOutputStream extends OutputStream { private boolean isMultipartKey; private int maxRetryCount; private long retryInterval; + private boolean unsafeByteBufferConversion; public Builder setMultipartUploadID(String uploadID) { this.multipartUploadID = uploadID; @@ -656,6 +659,11 @@ public class KeyOutputStream extends OutputStream { return this; } + public Builder enableUnsafeByteBufferConversion(boolean enabled) { + this.unsafeByteBufferConversion = enabled; + return this; + } + public KeyOutputStream build() { return new KeyOutputStream(openHandler, xceiverManager, omClient, chunkSize, requestID, factor, type, @@ -663,7 +671,7 @@ public class KeyOutputStream extends OutputStream { streamBufferMaxSize, blockSize, watchTimeout, checksumType, bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey, - maxRetryCount, retryInterval); + maxRetryCount, retryInterval, unsafeByteBufferConversion); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 94fbe52..c61d0eb 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -132,6 +132,7 @@ public class RpcClient implements ClientProtocol { private final int chunkSize; private final ChecksumType checksumType; private final int bytesPerChecksum; + private final boolean unsafeByteBufferConversion; private boolean verifyChecksum; private final UserGroupInformation ugi; private final ACLType userRights; @@ -211,6 +212,10 @@ public class RpcClient implements ClientProtocol { blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); + unsafeByteBufferConversion = conf.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + int configuredChecksumSize = (int) conf.getStorageSize( OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, @@ -1279,6 +1284,7 @@ public class RpcClient implements ClientProtocol { .setBytesPerChecksum(bytesPerChecksum) .setMaxRetryCount(maxRetryCount) .setRetryInterval(retryInterval) + .enableUnsafeByteBufferConversion(unsafeByteBufferConversion) .build(); keyOutputStream .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
