HADOOP-13168. Support Future.get with timeout in ipc async calls.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa7c7f25
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa7c7f25
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa7c7f25

Branch: refs/heads/branch-2.8
Commit: fa7c7f25105bc157c652a2f44ac49620fc61c0f4
Parents: ac04900
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Thu May 19 15:58:15 2016 -0700
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Thu May 26 12:22:57 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 119 +++++++++---------
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  62 +++++-----
 .../apache/hadoop/util/concurrent/AsyncGet.java |  60 +++++++++
 .../hadoop/util/concurrent/AsyncGetFuture.java  |  73 +++++++++++
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 122 +++++++++++--------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  26 +---
 .../ClientNamenodeProtocolTranslatorPB.java     |  33 ++---
 7 files changed, 312 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 1f753cb..23b14e1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,46 +18,10 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.apache.hadoop.ipc.RpcConstants.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -93,14 +57,25 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+import java.io.*;
+import java.net.*;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -119,8 +94,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new 
ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new 
ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>>
-      RETURN_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
+      = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -131,8 +106,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnRpcResponse() {
-    return (Future<T>) RETURN_RPC_RESPONSE.get();
+  public static <T> Future<T> getAsyncRpcResponse() {
+    return (Future<T>) ASYNC_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -381,6 +356,11 @@ public class Client implements AutoCloseable {
       }
     }
 
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + id;
+    }
+
     /** Indicate when the call is complete and the
      * value or error are available.  Notifies by default.  */
     protected synchronized void callComplete() {
@@ -1414,27 +1394,32 @@ public class Client implements AutoCloseable {
     }
 
     if (isAsynchronousMode()) {
-      Future<Writable> returnFuture = new AbstractFuture<Writable>() {
-        private final AtomicBoolean callled = new AtomicBoolean(false);
+      final AsyncGet<Writable, IOException> asyncGet
+          = new AsyncGet<Writable, IOException>() {
         @Override
-        public Writable get() throws InterruptedException, ExecutionException {
-          if (callled.compareAndSet(false, true)) {
-            try {
-              set(getRpcResponse(call, connection));
-            } catch (IOException ie) {
-              setException(ie);
-            } finally {
+        public Writable get(long timeout, TimeUnit unit)
+            throws IOException, TimeoutException{
+          boolean done = true;
+          try {
+            final Writable w = getRpcResponse(call, connection, timeout, unit);
+            if (w == null) {
+              done = false;
+              throw new TimeoutException(call + " timed out "
+                  + timeout + " " + unit);
+            }
+            return w;
+          } finally {
+            if (done) {
               releaseAsyncCall();
             }
           }
-          return super.get();
         }
       };
 
-      RETURN_RPC_RESPONSE.set(returnFuture);
+      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
       return null;
     } else {
-      return getRpcResponse(call, connection);
+      return getRpcResponse(call, connection, -1, null);
     }
   }
 
@@ -1470,12 +1455,18 @@ public class Client implements AutoCloseable {
     return asyncCallCounter.get();
   }
 
-  private Writable getRpcResponse(final Call call, final Connection connection)
-      throws IOException {
+  /** @return the rpc response or, in case of timeout, null. */
+  private Writable getRpcResponse(final Call call, final Connection connection,
+      final long timeout, final TimeUnit unit) throws IOException {
     synchronized (call) {
       while (!call.done) {
         try {
-          call.wait();                           // wait for the result
+          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
+              timeout, unit);
+          call.wait(waitTimeout); // wait for the result
+          if (waitTimeout > 0 && !call.done) {
+            return null;
+          }
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Call interrupted");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 350e041..4641a67 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,21 +18,9 @@
 
 package org.apache.hadoop.ipc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.SocketFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -52,17 +40,23 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.Message;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import javax.net.SocketFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * RPC Engine for for protobuf based RPCs.
@@ -70,8 +64,8 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<Callable<?>>
-      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<Message, Exception>>
+      ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
 
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -81,10 +75,9 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
-  @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Callable<T> getReturnMessageCallback() {
-    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
+  public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
+    return ASYNC_RETURN_MESSAGE.get();
   }
 
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -263,14 +256,17 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
-        Callable<Message> callback = new Callable<Message>() {
+        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
+        final AsyncGet<Message, Exception> asyncGet
+            = new AsyncGet<Message, Exception>() {
           @Override
-          public Message call() throws Exception {
-            return getReturnMessage(method, frrw.get());
+          public Message get(long timeout, TimeUnit unit) throws Exception {
+            final RpcResponseWrapper rrw = timeout < 0?
+                frrw.get(): frrw.get(timeout, unit);
+            return getReturnMessage(method, rrw);
           }
         };
-        RETURN_MESSAGE_CALLBACK.set(callback);
+        ASYNC_RETURN_MESSAGE.set(asyncGet);
         return null;
       } else {
         return getReturnMessage(method, val);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
new file mode 100644
index 0000000..5eac869
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
+ *
+ * When the return value is still being computed, invoking
+ * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
+ * The method should be invoked again and again
+ * until the underlying computation is completed.
+ *
+ * @param <R> The type of the return value.
+ * @param <E> The exception type that the underlying implementation may throw.
+ */
+public interface AsyncGet<R, E extends Throwable> {
+  /**
+   * Get the result.
+   *
+   * @param timeout The maximum time period to wait.
+   *                When timeout == 0, it does not wait at all.
+   *                When timeout < 0, it waits indefinitely.
+   * @param unit The unit of the timeout value
+   * @return the result, which is possibly null.
+   * @throws E an exception thrown by the underlying implementation.
+   * @throws TimeoutException if it cannot return after the given time period.
+   * @throws InterruptedException if the thread is interrupted.
+   */
+  R get(long timeout, TimeUnit unit)
+      throws E, TimeoutException, InterruptedException;
+
+  /** Utility */
+  class Util {
+    /**
+     * @return {@link Object#wait(long)} timeout converted
+     *         from {@link #get(long, TimeUnit)} timeout.
+     */
+    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit 
unit){
+      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
new file mode 100644
index 0000000..d687867
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.concurrent;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** A {@link Future} implemented using an {@link AsyncGet} object. */
+public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T> {
+  public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
+
+  private final AtomicBoolean called = new AtomicBoolean(false);
+  private final AsyncGet<T, E> asyncGet;
+
+  public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
+    this.asyncGet = asyncGet;
+  }
+
+  private void callAsyncGet(long timeout, TimeUnit unit) {
+    if (!isCancelled() && called.compareAndSet(false, true)) {
+      try {
+        set(asyncGet.get(timeout, unit));
+      } catch (TimeoutException te) {
+        LOG.trace("TRACE", te);
+        called.compareAndSet(true, false);
+      } catch (Throwable e) {
+        LOG.trace("TRACE", e);
+        setException(e);
+      }
+    }
+  }
+
+  @Override
+  public T get() throws InterruptedException, ExecutionException {
+    callAsyncGet(-1, TimeUnit.MILLISECONDS);
+    return super.get();
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, TimeoutException, ExecutionException {
+    callAsyncGet(timeout, unit);
+    return super.get(0, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public boolean isDone() {
+    callAsyncGet(0, TimeUnit.MILLISECONDS);
+    return super.isDone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 8ee3a2c..7623975 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,20 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +34,17 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 public class TestAsyncIPC {
 
   private static Configuration conf;
@@ -87,26 +84,51 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+          Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {
-          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
+          throw new RuntimeException(e);
         }
       }
     }
 
-    public void waitForReturnValues() throws InterruptedException,
-        ExecutionException {
+    void assertReturnValues() throws InterruptedException, ExecutionException {
       for (int i = 0; i < count; i++) {
         LongWritable value = returnFutures.get(i).get();
-        if (expectedValues.get(i) != value.get()) {
-          LOG.fatal(String.format("Call-%d failed!", i));
-          failed = true;
-          break;
+        Assert.assertEquals("call" + i + " failed.",
+            expectedValues.get(i).longValue(), value.get());
+      }
+      Assert.assertFalse(failed);
+    }
+
+    void assertReturnValues(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException {
+      final boolean[] checked = new boolean[count];
+      for(boolean done = false; !done;) {
+        done = true;
+        for (int i = 0; i < count; i++) {
+          if (checked[i]) {
+            continue;
+          } else {
+            done = false;
+          }
+
+          final LongWritable value;
+          try {
+            value = returnFutures.get(i).get(timeout, unit);
+          } catch (TimeoutException e) {
+            LOG.info("call" + i + " caught ", e);
+            continue;
+          }
+
+          Assert.assertEquals("call" + i + " failed.",
+              expectedValues.get(i).longValue(), value.get());
+          checked[i] = true;
         }
       }
+      Assert.assertFalse(failed);
     }
   }
 
@@ -183,7 +205,7 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+      Future<LongWritable> returnFuture = Client.getAsyncRpcResponse();
       returnFutures.put(idx, returnFuture);
       expectedValues.put(idx, param);
     }
@@ -233,10 +255,7 @@ public class TestAsyncIPC {
     }
     for (int i = 0; i < callerCount; i++) {
       callers[i].join();
-      callers[i].waitForReturnValues();
-      String msg = String.format("Expected not failed for caller-%d: %s.", i,
-          callers[i]);
-      assertFalse(msg, callers[i].failed);
+      callers[i].assertReturnValues();
     }
     for (int i = 0; i < clientCount; i++) {
       clients[i].stop();
@@ -258,25 +277,37 @@ public class TestAsyncIPC {
     try {
       AsyncCaller caller = new AsyncCaller(client, addr, callCount);
       caller.run();
+      caller.assertReturnValues();
+      caller.assertReturnValues();
+      caller.assertReturnValues();
+      Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
 
-      caller.waitForReturnValues();
-      String msg = String.format(
-          "First time, expected not failed for caller: %s.", caller);
-      assertFalse(msg, caller.failed);
+  @Test(timeout = 60000)
+  public void testFutureGetWithTimeout() throws IOException,
+      InterruptedException, ExecutionException {
+//    GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
+    final Server server = new TestIPC.TestServer(10, true, conf);
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
 
-      caller.waitForReturnValues();
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
-      msg = String.format("Second time, expected not failed for caller: %s.",
-          caller);
-      assertFalse(msg, caller.failed);
+    final Client client = new Client(LongWritable.class, conf);
 
-      assertTrue(asyncCallCount == client.getAsyncCallCount());
+    try {
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      caller.run();
+      caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
     } finally {
       client.stop();
       server.stop();
     }
   }
 
+
   public void internalTestAsyncCallLimit(int handlerCount, boolean 
handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
@@ -367,9 +398,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", 
caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -406,9 +435,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", 
caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -443,9 +470,7 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.waitForReturnValues();
-      String msg = String.format("Expected not failed for caller: %s.", 
caller);
-      assertFalse(msg, caller.failed);
+      caller.assertReturnValues();
     } finally {
       client.stop();
       server.stop();
@@ -489,10 +514,7 @@ public class TestAsyncIPC {
       }
       for (int i = 0; i < callerCount; ++i) {
         callers[i].join();
-        callers[i].waitForReturnValues();
-        String msg = String.format("Expected not failed for caller-%d: %s.", i,
-            callers[i]);
-        assertFalse(msg, callers[i].failed);
+        callers[i].assertReturnValues();
       }
     } finally {
       client.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 4fe0861..1f60df2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,20 +19,17 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.util.concurrent.AsyncGet;
+import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
-import com.google.common.util.concurrent.AbstractFuture;
-
 /****************************************************************
  * Implementation of the asynchronous distributed file system.
  * This instance of this class is the way end-user code interacts
@@ -52,22 +49,9 @@ public class AsyncDistributedFileSystem {
   }
 
   static <T> Future<T> getReturnValue() {
-    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
-        .getReturnValueCallback();
-    Future<T> returnFuture = new AbstractFuture<T>() {
-      private final AtomicBoolean called = new AtomicBoolean(false);
-      public T get() throws InterruptedException, ExecutionException {
-        if (called.compareAndSet(false, true)) {
-          try {
-            set(returnValueCallback.call());
-          } catch (Exception e) {
-            setException(e);
-          }
-        }
-        return super.get();
-      }
-    };
-    return returnFuture;
+    final AsyncGet<T, Exception> asyncGet
+        = ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue();
+    return new AsyncGetFuture<>(asyncGet);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7c7f25/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 94c6c0f..849f06d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,7 +24,8 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
+
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -190,6 +191,7 @@ import org.apache.hadoop.security.token.Token;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.util.concurrent.AsyncGet;
 
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -201,8 +203,8 @@ import com.google.protobuf.ServiceException;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<Callable<?>>
-      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
+  private static final ThreadLocal<AsyncGet<?, Exception>>
+      ASYNC_RETURN_VALUE = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -237,8 +239,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Callable<T> getReturnValueCallback() {
-    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
+  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
+    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
   }
 
   @Override
@@ -360,7 +362,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setPermission(null, req);
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.setPermission(null, req);
       }
@@ -369,17 +371,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
-  private void setReturnValueCallback() {
-    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-        .getReturnMessageCallback();
-    Callable<Void> callBack = new Callable<Void>() {
+  private void setAsyncReturnValue() {
+    final AsyncGet<Message, Exception> asyncReturnMessage
+        = ProtobufRpcEngine.getAsyncReturnMessage();
+    final AsyncGet<Void, Exception> asyncGet
+        = new AsyncGet<Void, Exception>() {
       @Override
-      public Void call() throws Exception {
-        returnMessageCallback.call();
+      public Void get(long timeout, TimeUnit unit) throws Exception {
+        asyncReturnMessage.get(timeout, unit);
         return null;
       }
     };
-    RETURN_VALUE_CALLBACK.set(callBack);
+    ASYNC_RETURN_VALUE.set(asyncGet);
   }
 
   @Override
@@ -394,7 +397,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setOwner(null, req.build());
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.setOwner(null, req.build());
       }
@@ -526,7 +529,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-        setReturnValueCallback();
+        setAsyncReturnValue();
       } else {
         rpcProxy.rename2(null, req);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to