Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem.  
Contributed by Xiaobing Zhou"

This reverts commit fc94810d3f537e51e826fc21ade7867892b9d8dc.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/106234d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/106234d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/106234d8

Branch: refs/heads/HDFS-1312
Commit: 106234d873c60fa52cd0d812fb1cdc0c6b998a6d
Parents: 4d36b22
Author: Andrew Wang <w...@apache.org>
Authored: Fri Jun 3 18:09:55 2016 -0700
Committer: Andrew Wang <w...@apache.org>
Committed: Fri Jun 3 18:09:55 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   |   1 +
 .../main/java/org/apache/hadoop/ipc/Client.java |  11 +-
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  34 +--
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     |   2 +-
 .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 --------
 .../hadoop/hdfs/DistributedFileSystem.java      |  22 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  45 +---
 .../apache/hadoop/hdfs/TestAsyncDFSRename.java  | 258 -------------------
 8 files changed, 20 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 9e13a7a..0ecd8b7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1252,6 +1252,7 @@ public abstract class FileSystem extends Configured 
implements Closeable {
   /**
    * Renames Path src to Path dst
    * <ul>
+   * <li
    * <li>Fails if src is a file and dst is a directory.
    * <li>Fails if src is a directory and dst is a file.
    * <li>Fails if the parent of dst does not exist or is a file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d59aeb89..f206861 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -119,8 +119,7 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new 
ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new 
ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>>
-      RETURN_RPC_RESPONSE = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>> returnValue = new 
ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -131,8 +130,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getReturnRpcResponse() {
-    return (Future<T>) RETURN_RPC_RESPONSE.get();
+  public static <T> Future<T> getReturnValue() {
+    return (Future<T>) returnValue.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -1397,7 +1396,7 @@ public class Client implements AutoCloseable {
         }
       };
 
-      RETURN_RPC_RESPONSE.set(returnFuture);
+      returnValue.set(returnFuture);
       return null;
     } else {
       return getRpcResponse(call, connection);
@@ -1411,7 +1410,7 @@ public class Client implements AutoCloseable {
    *          synchronous mode.
    */
   @Unstable
-  public static boolean isAsynchronousMode() {
+  static boolean isAsynchronousMode() {
     return asynchronousMode.get();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 8fcdb78..071e2e8 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -26,9 +26,7 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputOutputStream;
 import org.apache.hadoop.io.Writable;
@@ -70,9 +67,7 @@ import com.google.protobuf.TextFormat;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<Callable<?>>
-      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
-
+  
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
         RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@@ -81,12 +76,6 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
-  @SuppressWarnings("unchecked")
-  @Unstable
-  public static <T> Callable<T> getReturnMessageCallback() {
-    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
-  }
-
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
@@ -200,7 +189,7 @@ public class ProtobufRpcEngine implements RpcEngine {
      * the server.
      */
     @Override
-    public Object invoke(Object proxy, final Method method, Object[] args)
+    public Object invoke(Object proxy, Method method, Object[] args)
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
@@ -262,23 +251,6 @@ public class ProtobufRpcEngine implements RpcEngine {
         LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
       }
       
-      if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
-        Callable<Message> callback = new Callable<Message>() {
-          @Override
-          public Message call() throws Exception {
-            return getReturnMessage(method, frrw.get());
-          }
-        };
-        RETURN_MESSAGE_CALLBACK.set(callback);
-        return null;
-      } else {
-        return getReturnMessage(method, val);
-      }
-    }
-
-    private Message getReturnMessage(final Method method,
-        final RpcResponseWrapper rrw) throws ServiceException {
       Message prototype = null;
       try {
         prototype = getReturnProtoType(method);
@@ -288,7 +260,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       Message returnMessage;
       try {
         returnMessage = prototype.newBuilderForType()
-            .mergeFrom(rrw.theResponseRead).build();
+            .mergeFrom(val.theResponseRead).build();
 
         if (LOG.isTraceEnabled()) {
           LOG.trace(Thread.currentThread().getId() + ": Response <- " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c7..de4395e 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -84,7 +84,7 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+          Future<LongWritable> returnFuture = Client.getReturnValue();
           returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
deleted file mode 100644
index 37899aa..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.ipc.Client;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-/****************************************************************
- * Implementation of the asynchronous distributed file system.
- * This instance of this class is the way end-user code interacts
- * with a Hadoop DistributedFileSystem in an asynchronous manner.
- *
- *****************************************************************/
-@Unstable
-public class AsyncDistributedFileSystem {
-
-  private final DistributedFileSystem dfs;
-
-  AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
-    this.dfs = dfs;
-  }
-
-  static <T> Future<T> getReturnValue() {
-    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
-        .getReturnValueCallback();
-    Future<T> returnFuture = new AbstractFuture<T>() {
-      public T get() throws InterruptedException, ExecutionException {
-        try {
-          set(returnValueCallback.call());
-        } catch (Exception e) {
-          setException(e);
-        }
-        return super.get();
-      }
-    };
-    return returnFuture;
-  }
-
-  /**
-   * Renames Path src to Path dst
-   * <ul>
-   * <li>Fails if src is a file and dst is a directory.
-   * <li>Fails if src is a directory and dst is a file.
-   * <li>Fails if the parent of dst does not exist or is a file.
-   * </ul>
-   * <p>
-   * If OVERWRITE option is not passed as an argument, rename fails if the dst
-   * already exists.
-   * <p>
-   * If OVERWRITE option is passed as an argument, rename overwrites the dst if
-   * it is a file or an empty directory. Rename fails if dst is a non-empty
-   * directory.
-   * <p>
-   * Note that atomicity of rename is dependent on the file system
-   * implementation. Please refer to the file system documentation for details.
-   * This default implementation is non atomic.
-   *
-   * @param src
-   *          path to be renamed
-   * @param dst
-   *          new path after rename
-   * @throws IOException
-   *           on failure
-   * @return an instance of Future, #get of which is invoked to wait for
-   *         asynchronous call being finished.
-   */
-  public Future<Void> rename(Path src, Path dst,
-      final Options.Rename... options) throws IOException {
-    dfs.getFsStatistics().incrementWriteOps(1);
-
-    final Path absSrc = dfs.fixRelativePart(src);
-    final Path absDst = dfs.fixRelativePart(dst);
-
-    final boolean isAsync = Client.isAsynchronousMode();
-    Client.setAsynchronousMode(true);
-    try {
-      dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
-          options);
-      return getReturnValue();
-    } finally {
-      Client.setAsynchronousMode(isAsync);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 0ae4d70..5e54edd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -31,7 +31,6 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStoragePolicySpi;
@@ -205,7 +204,7 @@ public class DistributedFileSystem extends FileSystem {
    * @return path component of {file}
    * @throws IllegalArgumentException if URI does not belong to this DFS
    */
-  String getPathName(Path file) {
+  private String getPathName(Path file) {
     checkPath(file);
     String result = file.toUri().getPath();
     if (!DFSUtilClient.isValidName(result)) {
@@ -2510,23 +2509,4 @@ public class DistributedFileSystem extends FileSystem {
     }
     return ret;
   }
-
-  private final AsyncDistributedFileSystem adfs =
-      new AsyncDistributedFileSystem(this);
-
-  /** @return an {@link AsyncDistributedFileSystem} object. */
-  @Unstable
-  public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
-    return adfs;
-  }
-
-  @Override
-  protected Path fixRelativePart(Path p) {
-    return super.fixRelativePart(p);
-  }
-
-  Statistics getFsStatistics() {
-    return statistics;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index f4074b6..513a5e3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,14 +24,11 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
-import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -58,7 +55,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@@ -139,6 +135,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -156,15 +153,13 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.*;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
-import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -182,9 +177,8 @@ import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -196,9 +190,12 @@ import 
org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 
+import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
+    .EncryptionZoneProto;
+
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
  * while translating from the parameter types used in ClientProtocol to the
@@ -209,8 +206,6 @@ import com.google.protobuf.ServiceException;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<Callable<?>>
-      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -244,12 +239,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
     rpcProxy = proxy;
   }
 
-  @SuppressWarnings("unchecked")
-  @Unstable
-  public static <T> Callable<T> getReturnValueCallback() {
-    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
-  }
-
   @Override
   public void close() {
     RPC.stopProxy(rpcProxy);
@@ -486,7 +475,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
     RenameRequestProto req = RenameRequestProto.newBuilder()
         .setSrc(src)
         .setDst(dst).build();
-
     try {
       return rpcProxy.rename(null, req).getResult();
     } catch (ServiceException e) {
@@ -511,22 +499,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         setDst(dst).setOverwriteDest(overwrite).
         build();
     try {
-      if (Client.isAsynchronousMode()) {
-        rpcProxy.rename2(null, req);
-
-        final Callable<Message> returnMessageCallback = ProtobufRpcEngine
-            .getReturnMessageCallback();
-        Callable<Void> callBack = new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            returnMessageCallback.call();
-            return null;
-          }
-        };
-        RETURN_VALUE_CALLBACK.set(callBack);
-      } else {
-        rpcProxy.rename2(null, req);
-      }
+      rpcProxy.rename2(null, req);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/106234d8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
deleted file mode 100644
index 9322e1a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestAsyncDFSRename {
-  final Path asyncRenameDir = new Path("/test/async_rename/");
-  public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
-  final private static Configuration CONF = new HdfsConfiguration();
-
-  final private static String GROUP1_NAME = "group1";
-  final private static String GROUP2_NAME = "group2";
-  final private static String USER1_NAME = "user1";
-  private static final UserGroupInformation USER1;
-
-  private MiniDFSCluster gCluster;
-
-  static {
-    // explicitly turn on permission checking
-    CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
-    // create fake mapping for the groups
-    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
-    u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
-    DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
-    // Initiate all four users
-    USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] 
{
-        GROUP1_NAME, GROUP2_NAME });
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
-    gCluster.waitActive();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    if (gCluster != null) {
-      gCluster.shutdown();
-      gCluster = null;
-    }
-  }
-
-  static int countLease(MiniDFSCluster cluster) {
-    return TestDFSRename.countLease(cluster);
-  }
-
-  void list(DistributedFileSystem dfs, String name) throws IOException {
-    FileSystem.LOG.info("\n\n" + name);
-    for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
-      FileSystem.LOG.info("" + s.getPath());
-    }
-  }
-
-  static void createFile(DistributedFileSystem dfs, Path f) throws IOException 
{
-    DataOutputStream a_out = dfs.create(f);
-    a_out.writeBytes("something");
-    a_out.close();
-  }
-
-  /**
-   * Check the blocks of dst file are cleaned after rename with overwrite
-   * Restart NN to check the rename successfully
-   */
-  @Test
-  public void testAsyncRenameWithOverwrite() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
-        replFactor).build();
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-
-    try {
-
-      long fileLen = blockSize * 3;
-      String src = "/foo/src";
-      String dst = "/foo/dst";
-      String src2 = "/foo/src2";
-      String dst2 = "/foo/dst2";
-      Path srcPath = new Path(src);
-      Path dstPath = new Path(dst);
-      Path srcPath2 = new Path(src2);
-      Path dstPath2 = new Path(dst2);
-
-      DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
-      DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
-
-      LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst, 0, fileLen);
-      LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
-          cluster.getNameNode(), dst2, 0, fileLen);
-      BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
-          .getBlockManager();
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) != null);
-
-      Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
-      Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
-      retVal1.get();
-      retVal2.get();
-
-      assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
-      assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
-          .getLocalBlock()) == null);
-
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-      assertFalse(dfs.exists(srcPath));
-      assertTrue(dfs.exists(dstPath));
-      assertFalse(dfs.exists(srcPath2));
-      assertTrue(dfs.exists(dstPath2));
-    } finally {
-      if (dfs != null) {
-        dfs.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
-    final short replFactor = 2;
-    final long blockSize = 512;
-    final Path renameDir = new Path(
-        "/test/concurrent_reanme_with_overwrite_dir/");
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
-        .build();
-    cluster.waitActive();
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
-    int count = 1000;
-
-    try {
-      long fileLen = blockSize * 3;
-      assertTrue(dfs.mkdirs(renameDir));
-
-      Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, 
Future<Void>>();
-
-      // concurrently invoking many rename
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
-        DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
-        Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-        returnFutures.put(i, returnFuture);
-      }
-
-      // wait for completing the calls
-      for (int i = 0; i < count; i++) {
-        returnFutures.get(i).get();
-      }
-
-      // Restart NN and check the rename successfully
-      cluster.restartNameNodes();
-
-      // very the src dir should not exist, dst should
-      for (int i = 0; i < count; i++) {
-        Path src = new Path(renameDir, "src" + i);
-        Path dst = new Path(renameDir, "dst" + i);
-        assertFalse(dfs.exists(src));
-        assertTrue(dfs.exists(dst));
-      }
-    } finally {
-      dfs.delete(renameDir, true);
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testAsyncRenameWithException() throws Exception {
-    FileSystem rootFs = FileSystem.get(CONF);
-    final Path renameDir = new Path("/test/async_rename_exception/");
-    final Path src = new Path(renameDir, "src");
-    final Path dst = new Path(renameDir, "dst");
-    rootFs.mkdirs(src);
-
-    AsyncDistributedFileSystem adfs = USER1
-        .doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
-          @Override
-          public AsyncDistributedFileSystem run() throws Exception {
-            return gCluster.getFileSystem().getAsyncDistributedFileSystem();
-          }
-        });
-
-    try {
-      Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
-      returnFuture.get();
-    } catch (ExecutionException e) {
-      checkPermissionDenied(e, src);
-    }
-  }
-
-  private void checkPermissionDenied(final Exception e, final Path dir) {
-    assertTrue(e.getCause() instanceof ExecutionException);
-    assertTrue("Permission denied messages must carry AccessControlException",
-        e.getMessage().contains("AccessControlException"));
-    assertTrue("Permission denied messages must carry the username", e
-        .getMessage().contains(USER1_NAME));
-    assertTrue("Permission denied messages must carry the path parent", e
-        .getMessage().contains(dir.getParent().toUri().getPath()));
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to