HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by 
Plamen Jeliazkov.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f8ee2123
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f8ee2123
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f8ee2123

Branch: refs/heads/HDFS-12943
Commit: f8ee2123dbb71f4e4f8c18307614a53ffcc32487
Parents: 2bc0fd5
Author: Erik Krogen <xkro...@apache.org>
Authored: Wed Apr 4 15:42:39 2018 -0700
Committer: Erik Krogen <xkro...@apache.org>
Committed: Wed Apr 4 15:42:39 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/AlignmentContext.java | 14 +++
 .../main/java/org/apache/hadoop/ipc/Client.java |  2 +-
 .../main/java/org/apache/hadoop/ipc/Server.java |  5 ++
 .../java/org/apache/hadoop/util/ProtoUtil.java  | 13 +++
 .../src/main/proto/RpcHeader.proto              |  1 +
 .../apache/hadoop/hdfs/ClientGCIContext.java    | 30 +++++--
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 10 ++-
 .../server/namenode/GlobalStateIdContext.java   | 26 +++++-
 .../hadoop/hdfs/TestStateAlignmentContext.java  | 89 +++++++++++++++++++-
 9 files changed, 173 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index f952325..66d6edc 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -48,4 +49,17 @@ public interface AlignmentContext {
    */
   void receiveResponseState(RpcResponseHeaderProto header);
 
+  /**
+   * This is the intended client method call to pull last seen state info
+   * into RPC request processing.
+   * @param header The RPC request header builder.
+   */
+  void updateRequestState(RpcRequestHeaderProto.Builder header);
+
+  /**
+   * This is the intended server method call to implement to receive
+   * client state info during RPC response header processing.
+   * @param header The RPC request header.
+   */
+  void receiveRequestState(RpcRequestHeaderProto header);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d54f383..26b61c0 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1106,7 +1106,7 @@ public class Client implements AutoCloseable {
       // Items '1' and '2' are prepared here. 
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
-          clientId);
+          clientId, alignmentContext);
 
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 8b3513a..ff4a63a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2475,6 +2475,11 @@ public abstract class Server {
         }
       }
 
+      if (alignmentContext != null) {
+        // Check incoming RPC request's state.
+        alignmentContext.receiveRequestState(header);
+      }
+
       CallerContext callerContext = null;
       if (header.hasCallerContext()) {
         callerContext =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
index 1a5acba..9a0b05c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RPC;
 import 
org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@@ -165,6 +166,13 @@ public abstract class ProtoUtil {
   public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
       RpcRequestHeaderProto.OperationProto operation, int callId,
       int retryCount, byte[] uuid) {
+    return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
+        null);
+  }
+
+  public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
+      RpcRequestHeaderProto.OperationProto operation, int callId,
+      int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
     RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
     result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
         .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
@@ -190,6 +198,11 @@ public abstract class ProtoUtil {
       result.setCallerContext(contextBuilder);
     }
 
+    // Add alignment context if it is not null
+    if (alignmentContext != null) {
+      alignmentContext.updateRequestState(result);
+    }
+
     return result.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto 
b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index bfe1301..e8d8cbb 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the 
RpcRequest
   optional sint32 retryCount = 5 [default = -1];
   optional RPCTraceInfoProto traceInfo = 6; // tracing info
   optional RPCCallerContextProto callerContext = 7; // call context
+  optional int64 stateId = 8; // The last seen Global State ID
 }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
index 3d722f8..0d0bd25 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 import java.util.concurrent.atomic.LongAccumulator;
@@ -33,16 +34,11 @@ import java.util.concurrent.atomic.LongAccumulator;
 @InterfaceStability.Stable
 class ClientGCIContext implements AlignmentContext {
 
-  private final DFSClient dfsClient;
   private final LongAccumulator lastSeenStateId =
       new LongAccumulator(Math::max, Long.MIN_VALUE);
 
-  /**
-   * Client side constructor.
-   * @param dfsClient client side state receiver
-   */
-  ClientGCIContext(DFSClient dfsClient) {
-    this.dfsClient = dfsClient;
+  long getLastSeenStateId() {
+    return lastSeenStateId.get();
   }
 
   /**
@@ -55,11 +51,27 @@ class ClientGCIContext implements AlignmentContext {
   }
 
   /**
-   * Client side implementation for receiving state alignment info.
+   * Client side implementation for receiving state alignment info in 
responses.
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
     lastSeenStateId.accumulate(header.getStateId());
-    dfsClient.lastSeenStateId = lastSeenStateId.get();
+  }
+
+  /**
+   * Client side implementation for providing state alignment info in requests.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    header.setStateId(lastSeenStateId.longValue());
+  }
+
+  /**
+   * Client side implementation only provides state alignment info in requests.
+   * Client does not receive RPC requests therefore this does nothing.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    // Do nothing.
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0b90e9a..69ad097 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -216,7 +216,6 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
   volatile long lastLeaseRenewal;
-  volatile long lastSeenStateId;
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   final String clientName;
@@ -239,6 +238,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final long serverDefaultsValidityPeriod;
+  private final ClientGCIContext alignmentContext;
 
   public DfsClientConf getConf() {
     return dfsClientConf;
@@ -394,7 +394,8 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     this.saslClient = new SaslDataTransferClient(
         conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
-    Client.setAlignmentContext(new ClientGCIContext(this));
+    this.alignmentContext = new ClientGCIContext();
+    Client.setAlignmentContext(alignmentContext);
   }
 
   /**
@@ -543,6 +544,11 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     return clientRunning;
   }
 
+  @VisibleForTesting
+  ClientGCIContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
   long getLastLeaseRenewal() {
     return lastLeaseRenewal;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index 2d7d94e..f0ebf98 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
   }
 
   /**
-   * Server side implementation for providing state alignment info.
+   * Server side implementation for providing state alignment info in 
responses.
    */
   @Override
   public void updateResponseState(RpcResponseHeaderProto.Builder header) {
@@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext {
   public void receiveResponseState(RpcResponseHeaderProto header) {
     // Do nothing.
   }
+
+  /**
+   * Server side implementation only receives state alignment info.
+   * It does not build RPC requests therefore this does nothing.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Server side implementation for processing state alignment info in 
requests.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    long serverStateId = namesystem.getLastWrittenTransactionId();
+    long clientStateId = header.getStateId();
+    if (clientStateId > serverStateId) {
+      FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
+          ", but server state is: " + serverStateId);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f8ee2123/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
index 590f702..ce4639f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
@@ -18,20 +18,30 @@
 
 package org.apache.hadoop.hdfs;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Class is used to test server sending state alignment information to clients
@@ -91,7 +101,7 @@ public class TestStateAlignmentContext {
   public void testStateTransferOnWrite() throws Exception {
     long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
-    long clientState = dfs.dfs.lastSeenStateId;
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
     long postWriteState = 
cluster.getNamesystem().getLastWrittenTransactionId();
     // Write(s) should have increased state. Check for greater than.
     assertThat(clientState > preWriteState, is(true));
@@ -109,7 +119,8 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.readFile(dfs, new Path("/testFile2"));
     // Read should catch client up to last written state.
-    assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
+    assertThat(clientState, is(lastWrittenId));
   }
 
   /**
@@ -122,10 +133,80 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     try (DistributedFileSystem clearDfs =
              (DistributedFileSystem) FileSystem.get(CONF)) {
-      assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
+      ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
       DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
-      assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
+      assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
     }
   }
 
+  /**
+   * This test mocks an AlignmentContext and ensures that DFSClient
+   * writes its lastSeenStateId into RPC requests.
+   */
+  @Test
+  public void testClientSendsState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Collect RpcRequestHeaders for verification later.
+    final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders 
=
+        new ArrayList<>();
+    Mockito.doAnswer(a -> {
+      Object[] arguments = a.getArguments();
+      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+      collectedHeaders.add(header);
+      return a.callRealMethod();
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+
+    // Ensure first header and last header have different state.
+    assertThat(collectedHeaders.size() > 1, is(true));
+    assertThat(collectedHeaders.get(0).getStateId(),
+        is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
+
+    // Ensure collected RpcRequestHeaders are in increasing order.
+    long lastHeader = collectedHeaders.get(0).getStateId();
+    for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
+        collectedHeaders.subList(1, collectedHeaders.size())) {
+      long currentHeader = header.getStateId();
+      assertThat(currentHeader >= lastHeader, is(true));
+      lastHeader = header.getStateId();
+    }
+  }
+
+  /**
+   * This test mocks an AlignmentContext to send stateIds greater than
+   * server's stateId in RPC requests.
+   */
+  @Test
+  public void testClientSendsGreaterState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Make every client call have a stateId > server's stateId.
+    Mockito.doAnswer(a -> {
+      Object[] arguments = a.getArguments();
+      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+      try {
+        return a.callRealMethod();
+      } finally {
+        header.setStateId(Long.MAX_VALUE);
+      }
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+    logCapturer.stopCapturing();
+
+    String output = logCapturer.getOutput();
+    assertThat(output, containsString("A client sent stateId: "));
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to