[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.

2019-07-25 Thread cliang
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7b1e3c49d1508a30e6b8507cd5a99732e338e2d9
Author: Erik Krogen 
AuthorDate: Thu Dec 13 14:31:41 2018 -0800

HDFS-13873. [SBN read] ObserverNode should reject read requests when it is 
too far behind. Contributed by Konstantin Shvachko.
---
 .../org/apache/hadoop/ipc/AlignmentContext.java| 12 -
 .../main/java/org/apache/hadoop/ipc/Server.java| 31 -
 .../org/apache/hadoop/hdfs/ClientGSIContext.java   |  7 ++-
 .../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++---
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 +
 .../server/namenode/ha/TestMultiObserverNode.java  | 14 ++
 6 files changed, 115 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index a435ff6..bcddfbf 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -64,9 +66,15 @@ public interface AlignmentContext {
* client state info during RPC response header processing.
*
* @param header The RPC request header.
-   * @return state id of in the request header.
+   * @param threshold a parameter to verify a condition when server
+   *should reject client request due to its state being too far
+   *misaligned with the client state.
+   *See implementation for more details.
+   * @return state id required for the server to execute the call.
+   * @throws IOException
*/
-  long receiveRequestState(RpcRequestHeaderProto header);
+  long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+  throws IOException;
 
   /**
* Returns the last seen state id of the alignment context instance.
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 1cbf8b8..3d49b68 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2531,6 +2531,7 @@ public abstract class Server {
 
   // Save the priority level assignment by the scheduler
   call.setPriorityLevel(callQueue.getPriorityLevel(call));
+  call.markCallCoordinated(false);
   if(alignmentContext != null && call.rpcRequest != null &&
   (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
 // if call.rpcRequest is not RpcProtobufRequest, will skip the 
following
@@ -2539,23 +2540,21 @@ public abstract class Server {
 // coordinated.
 String methodName;
 String protoName;
+ProtobufRpcEngine.RpcProtobufRequest req =
+(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
 try {
-  ProtobufRpcEngine.RpcProtobufRequest req =
-  (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
   methodName = req.getRequestHeader().getMethodName();
   protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+  if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
+call.markCallCoordinated(true);
+long stateId;
+stateId = alignmentContext.receiveRequestState(
+header, getMaxIdleTime());
+call.setClientStateId(stateId);
+  }
 } catch (IOException ioe) {
-  throw new RpcServerException("Rpc request header check fail", ioe);
-}
-if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
-  call.markCallCoordinated(false);
-} else {
-  call.markCallCoordinated(true);
-  long stateId = alignmentContext.receiveRequestState(header);
-  call.setClientStateId(stateId);
+  throw new RpcServerException("Processing RPC request caught ", ioe);
 }
-  } else {
-call.markCallCoordinated(false);
   }
 
   try {
@@ -3607,4 +3606,12 @@ public abstract class Server {
   idleScanTimer.schedule(idleScanTask, idleScanInterval);
 }
   }
+
+  protected int getMaxIdleTime() {
+return connectionManager.maxIdleTime;
+  }
+
+  public String getServerName() {
+return serverName;
+  }
 }
diff --git 

[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.

2019-06-28 Thread cliang
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c1c061d767381023eed829ddbb0af4d32db2493b
Author: Erik Krogen 
AuthorDate: Thu Dec 13 14:31:41 2018 -0800

HDFS-13873. [SBN read] ObserverNode should reject read requests when it is 
too far behind. Contributed by Konstantin Shvachko.
---
 .../org/apache/hadoop/ipc/AlignmentContext.java| 12 -
 .../main/java/org/apache/hadoop/ipc/Server.java| 27 ++-
 .../org/apache/hadoop/hdfs/ClientGSIContext.java   |  7 ++-
 .../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++---
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 +
 .../server/namenode/ha/TestMultiObserverNode.java  | 14 ++
 6 files changed, 111 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index a435ff6..bcddfbf 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -64,9 +66,15 @@ public interface AlignmentContext {
* client state info during RPC response header processing.
*
* @param header The RPC request header.
-   * @return state id of in the request header.
+   * @param threshold a parameter to verify a condition when server
+   *should reject client request due to its state being too far
+   *misaligned with the client state.
+   *See implementation for more details.
+   * @return state id required for the server to execute the call.
+   * @throws IOException
*/
-  long receiveRequestState(RpcRequestHeaderProto header);
+  long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+  throws IOException;
 
   /**
* Returns the last seen state id of the alignment context instance.
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 20fee61..4f11541 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2536,6 +2536,7 @@ public abstract class Server {
 
   // Save the priority level assignment by the scheduler
   call.setPriorityLevel(callQueue.getPriorityLevel(call));
+  call.markCallCoordinated(false);
   if(alignmentContext != null && call.rpcRequest != null &&
   (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
 // if call.rpcRequest is not RpcProtobufRequest, will skip the 
following
@@ -2544,23 +2545,21 @@ public abstract class Server {
 // coordinated.
 String methodName;
 String protoName;
+ProtobufRpcEngine.RpcProtobufRequest req =
+(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
 try {
-  ProtobufRpcEngine.RpcProtobufRequest req =
-  (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
   methodName = req.getRequestHeader().getMethodName();
   protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+  if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
+call.markCallCoordinated(true);
+long stateId;
+stateId = alignmentContext.receiveRequestState(
+header, getMaxIdleTime());
+call.setClientStateId(stateId);
+  }
 } catch (IOException ioe) {
-  throw new RpcServerException("Rpc request header check fail", ioe);
-}
-if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
-  call.markCallCoordinated(false);
-} else {
-  call.markCallCoordinated(true);
-  long stateId = alignmentContext.receiveRequestState(header);
-  call.setClientStateId(stateId);
+  throw new RpcServerException("Processing RPC request caught ", ioe);
 }
-  } else {
-call.markCallCoordinated(false);
   }
 
   try {
@@ -3613,6 +3612,10 @@ public abstract class Server {
 }
   }
 
+  protected int getMaxIdleTime() {
+return connectionManager.maxIdleTime;
+  }
+
   public String getServerName() {
 return serverName;
   }
diff --git