This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new eeb4ac7 HDDS-2242. Avoid unnecessary rpc needed to discover the
pipeline leader. (#313)
eeb4ac7 is described below
commit eeb4ac70607500675037b57b88fd463b22918781
Author: Siddharth <[email protected]>
AuthorDate: Fri Dec 13 13:31:08 2019 +0530
HDDS-2242. Avoid unnecessary rpc needed to discover the pipeline leader.
(#313)
---
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 13 +--
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 20 ++++
.../hadoop/hdds/scm/TestRatisPipelineLeader.java | 129 +++++++++++++++++++++
3 files changed, 154 insertions(+), 8 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 38f01ce..7c36c26 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.ratis;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -143,7 +141,8 @@ public interface RatisHelper {
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy, int maxOutStandingRequest,
GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
- return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
+ return newRaftClient(rpcType,
+ toRaftPeerId(pipeline.getLeaderNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy, maxOutStandingRequest,
tlsConfig,
timeout);
@@ -158,16 +157,14 @@ public interface RatisHelper {
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getDuration(), timeUnit);
- final TimeDuration clientRequestTimeout =
- TimeDuration.valueOf(duration, timeUnit);
- return clientRequestTimeout;
+ return TimeDuration.valueOf(duration, timeUnit);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, int maxOutstandingRequests,
GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
- newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
+ newRaftGroup(Collections.singletonList(leader)), retryPolicy,
maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
@@ -175,7 +172,7 @@ public interface RatisHelper {
RetryPolicy retryPolicy, int maxOutstandingRequests,
TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
- newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
+ newRaftGroup(Collections.singletonList(leader)), retryPolicy,
maxOutstandingRequests, null, clientRequestTimeout);
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 66b1efa..bceb647 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -132,6 +133,25 @@ public final class Pipeline {
return new ArrayList<>(nodeStatus.keySet());
}
+ /**
+ * Returns the leader if found else defaults to closest node.
+ *
+ * @return {@link DatanodeDetails}
+ */
+ public DatanodeDetails getLeaderNode() throws IOException {
+ if (nodeStatus.isEmpty()) {
+ throw new IOException(String.format("Pipeline=%s is empty", id));
+ }
+ Optional<DatanodeDetails> datanodeDetails =
+ nodeStatus.keySet().stream().filter(d ->
+ d.getUuid().equals(leaderId)).findFirst();
+ if (datanodeDetails.isPresent()) {
+ return datanodeDetails.get();
+ } else {
+ return getClosestNode();
+ }
+ }
+
public DatanodeDetails getFirstNode() throws IOException {
if (nodeStatus.isEmpty()) {
throw new IOException(String.format("Pipeline=%s is empty", id));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
new file mode 100644
index 0000000..f5b7eaf
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestRatisPipelineLeader.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test pipeline leader information is correctly used.
+ */
+public class TestRatisPipelineLeader {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster
+ .newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testLeaderIdUsedOnFirstCall() throws Exception {
+ List<Pipeline> pipelines = cluster.getStorageContainerManager()
+ .getPipelineManager().getPipelines();
+ Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p ->
+ p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst();
+ Assert.assertTrue(ratisPipeline.isPresent());
+ Assert.assertTrue(ratisPipeline.get().isHealthy());
+ // Verify correct leader info populated
+ verifyLeaderInfo(ratisPipeline.get());
+ // Verify client connects to Leader without NotLeaderException
+ XceiverClientRatis xceiverClientRatis =
+ XceiverClientRatis.newXceiverClientRatis(ratisPipeline.get(), conf);
+ Logger.getLogger(GrpcClientProtocolService.class).setLevel(Level.DEBUG);
+ GenericTestUtils.LogCapturer logCapturer =
+
GenericTestUtils.LogCapturer.captureLogs(GrpcClientProtocolService.LOG);
+ xceiverClientRatis.connect();
+ ContainerProtocolCalls.createContainer(xceiverClientRatis, 1L, null);
+ logCapturer.stopCapturing();
+ Assert.assertFalse("Client should connect to pipeline leader on first
try.",
+ logCapturer.getOutput().contains(
+ "org.apache.ratis.protocol.NotLeaderException"));
+ }
+
+ @Test(timeout = 120000)
+ public void testLeaderIdAfterLeaderChange() throws Exception {
+ List<Pipeline> pipelines = cluster.getStorageContainerManager()
+ .getPipelineManager().getPipelines();
+ Optional<Pipeline> ratisPipeline = pipelines.stream().filter(p ->
+ p.getType().equals(HddsProtos.ReplicationType.RATIS)).findFirst();
+ Assert.assertTrue(ratisPipeline.isPresent());
+ Assert.assertTrue(ratisPipeline.get().isHealthy());
+ Optional<HddsDatanodeService> dnToStop =
+ cluster.getHddsDatanodes().stream().filter(s ->
+ !s.getDatanodeStateMachine().getDatanodeDetails().getUuid().equals(
+ ratisPipeline.get().getLeaderId())).findAny();
+ Assert.assertTrue(dnToStop.isPresent());
+ dnToStop.get().stop();
+ GenericTestUtils.waitFor(() -> ratisPipeline.get().isHealthy(), 300, 5000);
+ verifyLeaderInfo(ratisPipeline.get());
+ }
+
+ private void verifyLeaderInfo(Pipeline ratisPipeline) throws Exception {
+ Optional<HddsDatanodeService> hddsDatanodeService =
+ cluster.getHddsDatanodes().stream().filter(s ->
+ s.getDatanodeStateMachine().getDatanodeDetails().getUuid()
+ .equals(ratisPipeline.getLeaderId())).findFirst();
+ Assert.assertTrue(hddsDatanodeService.isPresent());
+
+ XceiverServerRatis serverRatis =
+ (XceiverServerRatis) hddsDatanodeService.get()
+ .getDatanodeStateMachine().getContainer().getWriteChannel();
+
+ GroupInfoRequest groupInfoRequest = new GroupInfoRequest(
+ ClientId.randomId(), serverRatis.getServer().getId(),
+ RaftGroupId.valueOf(ratisPipeline.getId().getId()), 100);
+ GroupInfoReply reply =
+ serverRatis.getServer().getGroupInfo(groupInfoRequest);
+ Assert.assertTrue(reply.getRoleInfoProto().hasLeaderInfo());
+ Assert.assertEquals(ratisPipeline.getLeaderId().toString(),
+ reply.getRoleInfoProto().getSelf().getId().toStringUtf8());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]