[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.
This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/hadoop.git commit 7b1e3c49d1508a30e6b8507cd5a99732e338e2d9 Author: Erik Krogen AuthorDate: Thu Dec 13 14:31:41 2018 -0800 HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko. --- .../org/apache/hadoop/ipc/AlignmentContext.java| 12 - .../main/java/org/apache/hadoop/ipc/Server.java| 31 - .../org/apache/hadoop/hdfs/ClientGSIContext.java | 7 ++- .../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++--- .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 + .../server/namenode/ha/TestMultiObserverNode.java | 14 ++ 6 files changed, 115 insertions(+), 21 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index a435ff6..bcddfbf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; @@ -64,9 +66,15 @@ public interface AlignmentContext { * client state info during RPC response header processing. * * @param header The RPC request header. - * @return state id of in the request header. + * @param threshold a parameter to verify a condition when server + *should reject client request due to its state being too far + *misaligned with the client state. + *See implementation for more details. + * @return state id required for the server to execute the call. + * @throws IOException */ - long receiveRequestState(RpcRequestHeaderProto header); + long receiveRequestState(RpcRequestHeaderProto header, long threshold) + throws IOException; /** * Returns the last seen state id of the alignment context instance. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1cbf8b8..3d49b68 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2531,6 +2531,7 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); + call.markCallCoordinated(false); if(alignmentContext != null && call.rpcRequest != null && (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { // if call.rpcRequest is not RpcProtobufRequest, will skip the following @@ -2539,23 +2540,21 @@ public abstract class Server { // coordinated. String methodName; String protoName; +ProtobufRpcEngine.RpcProtobufRequest req = +(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; try { - ProtobufRpcEngine.RpcProtobufRequest req = - (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; methodName = req.getRequestHeader().getMethodName(); protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + if (alignmentContext.isCoordinatedCall(protoName, methodName)) { +call.markCallCoordinated(true); +long stateId; +stateId = alignmentContext.receiveRequestState( +header, getMaxIdleTime()); +call.setClientStateId(stateId); + } } catch (IOException ioe) { - throw new RpcServerException("Rpc request header check fail", ioe); -} -if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { - call.markCallCoordinated(false); -} else { - call.markCallCoordinated(true); - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + throw new RpcServerException("Processing RPC request caught ", ioe); } - } else { -call.markCallCoordinated(false); } try { @@ -3607,4 +3606,12 @@ public abstract class Server { idleScanTimer.schedule(idleScanTask, idleScanInterval); } } + + protected int getMaxIdleTime() { +return connectionManager.maxIdleTime; + } + + public String getServerName() { +return serverName; + } } diff --git
[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.
This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git commit c1c061d767381023eed829ddbb0af4d32db2493b Author: Erik Krogen AuthorDate: Thu Dec 13 14:31:41 2018 -0800 HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko. --- .../org/apache/hadoop/ipc/AlignmentContext.java| 12 - .../main/java/org/apache/hadoop/ipc/Server.java| 27 ++- .../org/apache/hadoop/hdfs/ClientGSIContext.java | 7 ++- .../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++--- .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 + .../server/namenode/ha/TestMultiObserverNode.java | 14 ++ 6 files changed, 111 insertions(+), 21 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index a435ff6..bcddfbf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; @@ -64,9 +66,15 @@ public interface AlignmentContext { * client state info during RPC response header processing. * * @param header The RPC request header. - * @return state id of in the request header. + * @param threshold a parameter to verify a condition when server + *should reject client request due to its state being too far + *misaligned with the client state. + *See implementation for more details. + * @return state id required for the server to execute the call. + * @throws IOException */ - long receiveRequestState(RpcRequestHeaderProto header); + long receiveRequestState(RpcRequestHeaderProto header, long threshold) + throws IOException; /** * Returns the last seen state id of the alignment context instance. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 20fee61..4f11541 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2536,6 +2536,7 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); + call.markCallCoordinated(false); if(alignmentContext != null && call.rpcRequest != null && (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { // if call.rpcRequest is not RpcProtobufRequest, will skip the following @@ -2544,23 +2545,21 @@ public abstract class Server { // coordinated. String methodName; String protoName; +ProtobufRpcEngine.RpcProtobufRequest req = +(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; try { - ProtobufRpcEngine.RpcProtobufRequest req = - (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; methodName = req.getRequestHeader().getMethodName(); protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + if (alignmentContext.isCoordinatedCall(protoName, methodName)) { +call.markCallCoordinated(true); +long stateId; +stateId = alignmentContext.receiveRequestState( +header, getMaxIdleTime()); +call.setClientStateId(stateId); + } } catch (IOException ioe) { - throw new RpcServerException("Rpc request header check fail", ioe); -} -if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { - call.markCallCoordinated(false); -} else { - call.markCallCoordinated(true); - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + throw new RpcServerException("Processing RPC request caught ", ioe); } - } else { -call.markCallCoordinated(false); } try { @@ -3613,6 +3612,10 @@ public abstract class Server { } } + protected int getMaxIdleTime() { +return connectionManager.maxIdleTime; + } + public String getServerName() { return serverName; } diff --git