This is an automated email from the ASF dual-hosted git repository. shashikant pushed a commit to branch revert-472-HDDS-2920 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 145ae53b651405d8f7d8a6fef43309ed47878c3e Author: bshashikant <[email protected]> AuthorDate: Thu Jan 23 14:44:32 2020 +0530 Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)" This reverts commit ab557dba89e4666e8c2679fa9b5fbd95dfd45e0b. --- .../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 2 +- .../hadoop/hdds/scm/XceiverClientManager.java | 42 +-------------- .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 29 ++++++++--- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 6 ++- .../hadoop/hdds/scm/storage/CommitWatcher.java | 8 ++- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 46 +++++++++++++---- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 ++- .../apache/hadoop/hdds/scm/XceiverClientSpi.java | 3 +- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 11 ++++ .../common/src/main/resources/ozone-default.xml | 16 ++++++ .../ozone/client/io/BlockOutputStreamEntry.java | 2 +- .../hadoop/ozone/client/io/KeyOutputStream.java | 5 ++ .../apache/hadoop/ozone/client/rpc/RpcClient.java | 8 +++ .../org/apache/hadoop/ozone/RatisTestHelper.java | 9 +++- .../ozone/client/rpc/Test2WayCommitInRatis.java | 4 +- .../ozone/client/rpc/TestBlockOutputStream.java | 1 + .../rpc/TestBlockOutputStreamWithFailures.java | 1 + .../rpc/TestCloseContainerHandlingByClient.java | 1 + .../hadoop/ozone/client/rpc/TestCommitWatcher.java | 5 +- .../client/rpc/TestFailureHandlingByClient.java | 2 + .../ozone/client/rpc/TestKeyInputStream.java | 1 + .../rpc/TestMultiBlockWritesWithDnFailures.java | 2 + .../rpc/TestOzoneClientRetriesOnException.java | 1 + .../hadoop/ozone/client/rpc/TestReadRetries.java | 2 +- .../ozone/client/rpc/TestWatchForCommit.java | 60 ++++++++++++++++++++-- .../hadoop/ozone/freon/TestDataValidate.java | 2 + .../ozone/freon/TestOzoneClientKeyGenerator.java | 2 + .../hadoop/ozone/freon/TestRandomKeyGenerator.java | 2 + .../hadoop/ozone/freon/DatanodeChunkGenerator.java | 2 +- 29 files changed, 206 insertions(+), 76 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 0ded84a..9a4da38 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -473,7 +473,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { } @Override - public XceiverClientReply watchForCommit(long index) + public XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { // there is no notion of watch for commit index in standalone pipeline diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index dc3b215..d46456a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -342,12 +342,11 @@ public class XceiverClientManager implements Closeable { /** * Configuration for ratis client. */ - @ConfigGroup(prefix = "raft.client") + @ConfigGroup(prefix = "dfs.ratis.client") public static class DFSRatisClientConfig { - @Config(key = "async.outstanding-requests.max", + @Config(key = "async.max.outstanding.requests", defaultValue = "64", - type = ConfigType.INT, tags = {OZONE, CLIENT, PERFORMANCE}, description = "Controls the maximum number of outstanding async requests that can" @@ -362,43 +361,6 @@ public class XceiverClientManager implements Closeable { public void setMaxOutstandingRequests(int maxOutstandingRequests) { this.maxOutstandingRequests = maxOutstandingRequests; } - - @Config(key = "rpc.request.timeout", - defaultValue = "60s", - type = ConfigType.TIME, - tags = {OZONE, CLIENT, PERFORMANCE}, - description = "The timeout duration for ratis client request (except " + - "for watch request). It should be set greater than leader " + - "election timeout in Ratis." - ) - private long requestTimeOut = 60 * 1000; - - public long getRequestTimeOut() { - return requestTimeOut; - } - - public void setRequestTimeOut(long requestTimeOut) { - this.requestTimeOut = requestTimeOut; - } - - @Config(key = "watch.request.timeout", - defaultValue = "180s", - type = ConfigType.TIME, - tags = {OZONE, CLIENT, PERFORMANCE}, - description = "The timeout duration for ratis client watch request. " + - "Timeout for the watch API in Ratis client to acknowledgea " + - "particular request getting replayed to all servers." - ) - private long watchRequestTimeOut = 180 * 1000; - - public long getWatchRequestTimeOut() { - return watchRequestTimeOut; - } - - public void setWatchRequestTimeOut(long watchRequestTimeOut) { - this.watchRequestTimeOut = watchRequestTimeOut; - } } - } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 0d12355..6f102d8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -58,6 +59,7 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,19 +89,25 @@ public final class XceiverClientRatis extends XceiverClientSpi { final String rpcType = ozoneConf .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final TimeDuration clientRequestTimeout = + RatisHelper.getClientRequestTimeout(ozoneConf); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(ozoneConf); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new SecurityConfig(ozoneConf), caCert); return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType), - retryPolicy, tlsConfig, ozoneConf); + SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests, + retryPolicy, tlsConfig, clientRequestTimeout, ozoneConf); } private final Pipeline pipeline; private final RpcType rpcType; private final AtomicReference<RaftClient> client = new AtomicReference<>(); + private final int maxOutstandingRequests; private final RetryPolicy retryPolicy; private final GrpcTlsConfig tlsConfig; + private final TimeDuration clientRequestTimeout; private final Configuration ozoneConfiguration; // Map to track commit index at every server @@ -111,14 +119,17 @@ public final class XceiverClientRatis extends XceiverClientSpi { * Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, - RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, + int maxOutStandingChunks, RetryPolicy retryPolicy, + GrpcTlsConfig tlsConfig, TimeDuration timeout, Configuration configuration) { super(); this.pipeline = pipeline; this.rpcType = rpcType; + this.maxOutstandingRequests = maxOutStandingChunks; this.retryPolicy = retryPolicy; commitInfoMap = new ConcurrentHashMap<>(); this.tlsConfig = tlsConfig; + this.clientRequestTimeout = timeout; metrics = XceiverClientManager.getXceiverClientMetrics(); this.ozoneConfiguration = configuration; } @@ -170,7 +181,8 @@ public final class XceiverClientRatis extends XceiverClientSpi { if (!client.compareAndSet(null, RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy, - tlsConfig, ozoneConfiguration))) { + maxOutstandingRequests, tlsConfig, clientRequestTimeout, + ozoneConfiguration))) { throw new IllegalStateException("Client is already connected."); } } @@ -244,7 +256,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { } @Override - public XceiverClientReply watchForCommit(long index) + public XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException { long commitIndex = getReplicatedMinCommitIndex(); @@ -255,11 +267,14 @@ public final class XceiverClientRatis extends XceiverClientSpi { clientReply.setLogIndex(commitIndex); return clientReply; } + if (LOG.isDebugEnabled()) { + LOG.debug("commit index : {} watch timeout : {}", index, timeout); + } RaftClientReply reply; try { CompletableFuture<RaftClientReply> replyFuture = getClient() .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); - replyFuture.get(); + replyFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { Throwable t = HddsClientUtils.checkForException(e); LOG.warn("3 way commit failed on pipeline {}", pipeline, e); @@ -268,7 +283,7 @@ public final class XceiverClientRatis extends XceiverClientSpi { } reply = getClient() .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) - .get(); + .get(timeout, TimeUnit.MILLISECONDS); List<RaftProtos.CommitInfoProto> commitInfoProtoList = reply.getCommitInfos().stream() .filter(i -> i.getCommitIndex() < index) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 9131f5c..15aebe1 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -125,6 +125,7 @@ public class BlockOutputStream extends OutputStream { * @param bufferPool pool of buffers * @param streamBufferFlushSize flush size * @param streamBufferMaxSize max size of the currentBuffer + * @param watchTimeout watch timeout * @param checksumType checksum type * @param bytesPerChecksum Bytes per checksum */ @@ -132,7 +133,8 @@ public class BlockOutputStream extends OutputStream { public BlockOutputStream(BlockID blockID, XceiverClientManager xceiverClientManager, Pipeline pipeline, int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize, - BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum) + long watchTimeout, BufferPool bufferPool, ChecksumType checksumType, + int bytesPerChecksum) throws IOException { this.blockID = new AtomicReference<>(blockID); this.chunkSize = chunkSize; @@ -152,7 +154,7 @@ public class BlockOutputStream extends OutputStream { // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new CommitWatcher(bufferPool, xceiverClient); + commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java index 34d0d7c..ebcc6dc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java @@ -71,13 +71,17 @@ public class CommitWatcher { private XceiverClientSpi xceiverClient; + private final long watchTimeout; + // total data which has been successfully flushed and acknowledged // by all servers private long totalAckDataLength; - public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) { + public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient, + long watchTimeout) { this.bufferPool = bufferPool; this.xceiverClient = xceiverClient; + this.watchTimeout = watchTimeout; commitIndex2flushedDataMap = new ConcurrentSkipListMap<>(); totalAckDataLength = 0; futureMap = new ConcurrentHashMap<>(); @@ -187,7 +191,7 @@ public class CommitWatcher { long index; try { XceiverClientReply reply = - xceiverClient.watchForCommit(commitIndex); + xceiverClient.watchForCommit(commitIndex, watchTimeout); if (reply == null) { index = 0; } else { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 75e07c3..98c36b6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcFactory; @@ -148,12 +149,26 @@ public interface RatisHelper { } static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline, - RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, + RetryPolicy retryPolicy, int maxOutStandingRequest, + GrpcTlsConfig tlsConfig, TimeDuration timeout, Configuration ozoneConfiguration) throws IOException { return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeaderNode()), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), - pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration); + pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig, + timeout, ozoneConfiguration); + } + + static TimeDuration getClientRequestTimeout(Configuration conf) { + // Set the client requestTimeout + final TimeUnit timeUnit = + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT + .getUnit(); + final long duration = conf.getTimeDuration( + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY, + OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT + .getDuration(), timeUnit); + return TimeDuration.valueOf(duration, timeUnit); } static RpcType getRpcType(Configuration conf) { @@ -163,30 +178,35 @@ public interface RatisHelper { } static RaftClient newRaftClient(RaftPeer leader, Configuration conf) { - return newRaftClient(getRpcType(conf), leader, - RatisHelper.createRetryPolicy(conf), conf); + return newRaftClient(getRpcType(conf), leader, RetryPolicies.noRetry(), + GrpcConfigKeys.OutputStream.OUTSTANDING_APPENDS_MAX_DEFAULT, + getClientRequestTimeout(conf), conf); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, + RetryPolicy retryPolicy, int maxOutstandingRequests, + GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout, Configuration configuration) { return newRaftClient(rpcType, leader.getId(), newRaftGroup(Collections.singletonList(leader)), retryPolicy, - tlsConfig, configuration); + maxOutstandingRequests, tlsConfig, clientRequestTimeout, configuration); } static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, - RetryPolicy retryPolicy, + RetryPolicy retryPolicy, int maxOutstandingRequests, + TimeDuration clientRequestTimeout, Configuration ozoneConfiguration) { return newRaftClient(rpcType, leader.getId(), - newRaftGroup(Collections.singletonList(leader)), retryPolicy, null, + newRaftGroup(Collections.singletonList(leader)), retryPolicy, + maxOutstandingRequests, null, clientRequestTimeout, ozoneConfiguration); } @SuppressWarnings("checkstyle:ParameterNumber") static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, - RaftGroup group, RetryPolicy retryPolicy, - GrpcTlsConfig tlsConfig, Configuration ozoneConfiguration) { + RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest, + GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout, + Configuration ozoneConfiguration) { if (LOG.isTraceEnabled()) { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); @@ -200,10 +220,16 @@ public interface RatisHelper { createRaftGrpcProperties(ozoneConfiguration, properties); RaftConfigKeys.Rpc.setType(properties, rpcType); + RaftClientConfigKeys.Rpc + .setRequestTimeout(properties, clientRequestTimeout); GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)); + // set async max outstanding requests. + RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, + maxOutStandingRequest); + RaftClient.Builder builder = RaftClient.newBuilder() .setRaftGroup(group) .setLeaderId(leader) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index ab17a52..737add0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -114,7 +114,12 @@ public final class ScmConfigKeys { "dfs.container.ratis.leader.pending.bytes.limit"; public static final String DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB"; - + + public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY = + "dfs.ratis.client.request.timeout.duration"; + public static final TimeDuration + DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT = + TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = "dfs.ratis.client.request.max.retries"; public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 3287777..f938448 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -170,6 +170,7 @@ public abstract class XceiverClientSpi implements Closeable { /** * Check if an specfic commitIndex is replicated to majority/all servers. * @param index index to watch for + * @param timeout timeout provided for the watch operation to complete * @return reply containing the min commit index replicated to all or majority * servers in case of a failure * @throws InterruptedException @@ -177,7 +178,7 @@ public abstract class XceiverClientSpi implements Closeable { * @throws TimeoutException * @throws IOException */ - public abstract XceiverClientReply watchForCommit(long index) + public abstract XceiverClientReply watchForCommit(long index, long timeout) throws InterruptedException, ExecutionException, TimeoutException, IOException; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index f2f4a6a..857f1de 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -134,6 +134,12 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = "128MB"; + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT = + "ozone.client.watch.request.timeout"; + + public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT = + "30s"; + public static final String OZONE_CLIENT_MAX_RETRIES = "ozone.client.max.retries"; public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 100; @@ -263,6 +269,11 @@ public final class OzoneConfigKeys { public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = "dfs.container.ratis.datanode.storage.dir"; + public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY = + ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY; + public static final TimeDuration + DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT = + ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT; public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY = ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY; public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a49c198..c8682bd 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -253,6 +253,14 @@ </description> </property> <property> + <name>dfs.ratis.client.request.timeout.duration</name> + <value>3s</value> + <tag>OZONE, RATIS, MANAGEMENT</tag> + <description>The timeout duration for ratis client request.It should be + set greater than leader election timeout in Ratis. + </description> + </property> + <property> <name>dfs.ratis.client.request.max.retries</name> <value>180</value> <tag>OZONE, RATIS, MANAGEMENT</tag> @@ -428,6 +436,14 @@ </description> </property> <property> + <name>ozone.client.watch.request.timeout</name> + <value>30s</value> + <tag>OZONE, CLIENT</tag> + <description>Timeout for the watch API in Ratis client to acknowledge + a particular request getting replayed to all servers. + </description> + </property> + <property> <name>ozone.client.max.retries</name> <value>100</value> <tag>OZONE, CLIENT</tag> diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 4af792a..1aa10d8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -111,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream { this.outputStream = new BlockOutputStream(blockID, xceiverClientManager, pipeline, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, bufferPool, checksumType, + streamBufferMaxSize, watchTimeout, bufferPool, checksumType, bytesPerChecksum); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 7cf55d6..28916f9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -586,6 +586,11 @@ public class KeyOutputStream extends OutputStream { return this; } + public Builder setWatchTimeout(long timeout) { + this.watchTimeout = timeout; + return this; + } + public Builder setChecksumType(ChecksumType cType) { this.checksumType = cType; return this; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 2982e38..66b789f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -99,6 +99,7 @@ import java.net.URI; import java.security.InvalidKeyException; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -127,6 +128,7 @@ public class RpcClient implements ClientProtocol { private final long streamBufferFlushSize; private final long streamBufferMaxSize; private final long blockSize; + private final long watchTimeout; private final ClientId clientId = ClientId.randomId(); private final int maxRetryCount; private final long retryInterval; @@ -186,6 +188,10 @@ public class RpcClient implements ClientProtocol { StorageUnit.BYTES); blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); + watchTimeout = + conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); int configuredChecksumSize = (int) conf.getStorageSize( OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, @@ -884,6 +890,7 @@ public class RpcClient implements ClientProtocol { .setFactor(openKey.getKeyInfo().getFactor()) .setStreamBufferFlushSize(streamBufferFlushSize) .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) .setBlockSize(blockSize) .setBytesPerChecksum(bytesPerChecksum) .setChecksumType(checksumType) @@ -1184,6 +1191,7 @@ public class RpcClient implements ClientProtocol { .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue())) .setStreamBufferFlushSize(streamBufferFlushSize) .setStreamBufferMaxSize(streamBufferMaxSize) + .setWatchTimeout(watchTimeout) .setBlockSize(blockSize) .setChecksumType(checksumType) .setBytesPerChecksum(bytesPerChecksum) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 3f5d33e..f862fd2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.rpc.RpcClient; @@ -38,6 +39,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,8 +123,13 @@ public interface RatisTestHelper { RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException { final RaftPeer p = RatisHelper.toRaftPeer(dd); final OzoneConfiguration conf = new OzoneConfiguration(); + final int maxOutstandingRequests = + HddsClientUtils.getMaxOutstandingRequests(conf); + final TimeDuration requestTimeout = + RatisHelper.getClientRequestTimeout(conf); final RaftClient client = - newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf); + newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), + maxOutstandingRequests, requestTimeout, conf); client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId()); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java index fda6228..fd2cea3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java @@ -113,6 +113,8 @@ public class Test2WayCommitInRatis { @Test public void test2WayCommitForRetryfailure() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); startCluster(conf); GenericTestUtils.LogCapturer logCapturer = @@ -140,7 +142,7 @@ public class Test2WayCommitInRatis { .getCloseContainer(pipeline, container1.getContainerInfo().getContainerID())); reply.getResponse().get(); - xceiverClient.watchForCommit(reply.getLogIndex()); + xceiverClient.watchForCommit(reply.getLogIndex(), 20000); // commitInfo Map will be reduced to 2 here Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 96226d8..2b41012 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -76,6 +76,7 @@ public class TestBlockOutputStream { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index 3f1d9ff..3cbe06d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -83,6 +83,7 @@ public class TestBlockOutputStreamWithFailures { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index c2444c1..e18b222 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -85,6 +85,7 @@ public class TestCloseContainerHandlingByClient { public static void init() throws Exception { chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java index eaceb04..8089ac3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java @@ -87,6 +87,7 @@ public class TestCommitWatcher { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); @@ -138,7 +139,7 @@ public class TestCommitWatcher { Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertTrue(xceiverClient instanceof XceiverClientRatis); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient); + CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000); BlockID blockID = ContainerTestHelper.getTestBlockID(containerId); List<XceiverClientReply> replies = new ArrayList<>(); long length = 0; @@ -212,7 +213,7 @@ public class TestCommitWatcher { Assert.assertEquals(1, xceiverClient.getRefcount()); Assert.assertTrue(xceiverClient instanceof XceiverClientRatis); XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient; - CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient); + CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000); BlockID blockID = ContainerTestHelper.getTestBlockID(containerId); List<XceiverClientReply> replies = new ArrayList<>(); long length = 0; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index d1f2016..a7f5960 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -85,6 +85,8 @@ public class TestFailureHandlingByClient { conf = new OzoneConfiguration(); chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5, + TimeUnit.SECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index bb7b6f0..d834350 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -75,6 +75,7 @@ public class TestKeyInputStream { flushSize = 4 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setQuietMode(false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java index f532f4d..5717f58 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java @@ -77,6 +77,8 @@ public class TestMultiBlockWritesWithDnFailures { conf = new OzoneConfiguration(); chunkSize = (int) OzoneConsts.MB; blockSize = 4 * chunkSize; + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5, + TimeUnit.SECONDS); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java index 0151c6e..6758f4f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java @@ -84,6 +84,7 @@ public class TestOzoneClientRetriesOnException { flushSize = 2 * chunkSize; maxFlushSize = 2 * flushSize; blockSize = 2 * maxFlushSize; + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS); conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE"); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java index 9f9d5af..1343a03 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java @@ -174,7 +174,7 @@ public class TestReadRetries { Assert.assertTrue(clientSpi instanceof XceiverClientRatis); XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi; - ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId()); + ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000); // shutdown the datanode cluster.shutdownHddsDatanode(datanodeDetails); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index 645db6a..5808655 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -136,6 +136,8 @@ public class TestWatchForCommit { // and will be captured in keyOutputStream and the failover will happen // to a different block OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); conf.setTimeDuration( OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, @@ -269,9 +271,53 @@ public class TestWatchForCommit { } @Test + public void testWatchForCommitWithSmallerTimeoutValue() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); + startCluster(conf); + XceiverClientManager clientManager = new XceiverClientManager(conf); + ContainerWithPipeline container1 = storageContainerLocationClient + .allocateContainer(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE); + XceiverClientSpi xceiverClient = clientManager + .acquireClient(container1.getPipeline()); + Assert.assertEquals(1, xceiverClient.getRefcount()); + Assert.assertEquals(container1.getPipeline(), + xceiverClient.getPipeline()); + Pipeline pipeline = xceiverClient.getPipeline(); + XceiverClientReply reply = xceiverClient.sendCommandAsync( + ContainerTestHelper.getCreateContainerRequest( + container1.getContainerInfo().getContainerID(), + xceiverClient.getPipeline())); + reply.getResponse().get(); + long index = reply.getLogIndex(); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(0)); + cluster.shutdownHddsDatanode(pipeline.getNodes().get(1)); + try { + // just watch for a log index which in not updated in the commitInfo Map + // as well as there is no logIndex generate in Ratis. + // The basic idea here is just to test if its throws an exception. + xceiverClient + .watchForCommit(index + new Random().nextInt(100) + 10, 3000); + Assert.fail("expected exception not thrown"); + } catch (Exception e) { + Assert.assertTrue( + HddsClientUtils.checkForException(e) instanceof TimeoutException); + } + // After releasing the xceiverClient, this connection should be closed + // and any container operations should fail + clientManager.releaseClient(xceiverClient, false); + shutdown(); + } + + @Test public void testWatchForCommitForRetryfailure() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, + 100, TimeUnit.SECONDS); + conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); startCluster(conf); XceiverClientManager clientManager = new XceiverClientManager(conf); ContainerWithPipeline container1 = storageContainerLocationClient @@ -297,7 +343,7 @@ public class TestWatchForCommit { // as well as there is no logIndex generate in Ratis. // The basic idea here is just to test if its throws an exception. xceiverClient - .watchForCommit(index + new Random().nextInt(100) + 10); + .watchForCommit(index + new Random().nextInt(100) + 10, 20000); Assert.fail("expected exception not thrown"); } catch (Exception e) { Assert.assertTrue(e instanceof ExecutionException); @@ -314,8 +360,9 @@ public class TestWatchForCommit { @Test public void test2WayCommitForTimeoutException() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3, + TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); - conf.set("raft.client.watch.request.timeout", "3s"); startCluster(conf); GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG); @@ -342,7 +389,7 @@ public class TestWatchForCommit { .getCloseContainer(pipeline, container1.getContainerInfo().getContainerID())); reply.getResponse().get(); - xceiverClient.watchForCommit(reply.getLogIndex()); + xceiverClient.watchForCommit(reply.getLogIndex(), 3000); // commitInfo Map will be reduced to 2 here Assert.assertEquals(2, ratisClient.getCommitInfoMap().size()); @@ -358,6 +405,8 @@ public class TestWatchForCommit { @Test public void testWatchForCommitForGroupMismatchException() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20, + TimeUnit.SECONDS); conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20); // mark the node stale early so that pipleline gets destroyed quickly @@ -391,7 +440,8 @@ public class TestWatchForCommit { // as well as there is no logIndex generate in Ratis. // The basic idea here is just to test if its throws an exception. xceiverClient - .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10); + .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10, + 20000); Assert.fail("Expected exception not thrown"); } catch(Exception e) { Assert.assertTrue(HddsClientUtils diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index 7857e1f..fdcb822 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.Assert; import org.junit.Test; @@ -39,6 +40,7 @@ public abstract class TestDataValidate { * */ static void startCluster(OzoneConfiguration conf) throws Exception { + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java index bef3330..315d1ee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.freon; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.raftlog.RaftLog; @@ -67,6 +68,7 @@ public class TestOzoneClientKeyGenerator { if (conf == null) { conf = new OzoneConfiguration(); } + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5) .build(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java index 218c570..45ea23d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -44,6 +45,7 @@ public class TestRandomKeyGenerator { @BeforeClass public static void init() throws Exception { conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms"); cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build(); cluster.waitForClusterToBeReady(); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java index 16973ac..c4c84cb 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java @@ -181,7 +181,7 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements XceiverClientReply xceiverClientReply = xceiverClientSpi.sendCommandAsync(request); xceiverClientSpi - .watchForCommit(xceiverClientReply.getLogIndex()); + .watchForCommit(xceiverClientReply.getLogIndex(), 1000L); } else { xceiverClientSpi.sendCommand(request); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
