HDFS-9945. Datanode command for evicting writers. Contributed by Kihwal Lee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aede8c10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aede8c10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aede8c10

Branch: refs/heads/HDFS-7240
Commit: aede8c10ecad4f2a8802a834e4bd0b8286cebade
Parents: 188f652
Author: Eric Payne <epa...@apache.org>
Authored: Wed Apr 6 20:20:14 2016 +0000
Committer: Eric Payne <epa...@apache.org>
Committed: Wed Apr 6 20:20:14 2016 +0000

----------------------------------------------------------------------
 .../hdfs/protocol/ClientDatanodeProtocol.java   |  7 +++
 .../ClientDatanodeProtocolTranslatorPB.java     | 12 +++++
 .../src/main/proto/ClientDatanodeProtocol.proto | 10 ++++
 ...tDatanodeProtocolServerSideTranslatorPB.java | 15 ++++++
 .../hdfs/server/datanode/BlockReceiver.java     |  3 ++
 .../hadoop/hdfs/server/datanode/DataNode.java   |  7 +++
 .../hdfs/server/datanode/DataXceiver.java       | 48 +++++++++++++++----
 .../hdfs/server/datanode/DataXceiverServer.java |  6 +++
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  | 21 +++++++++
 .../TestClientProtocolForPipelineRecovery.java  | 49 ++++++++++++++++++++
 10 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
index 08547c1..e541388 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -122,6 +122,13 @@ public interface ClientDatanodeProtocol {
   void shutdownDatanode(boolean forUpgrade) throws IOException;
 
   /**
+   * Evict clients that are writing to a datanode.
+   *
+   * @throws IOException
+   */
+  void evictWriters() throws IOException;
+
+  /**
    * Obtains datanode info
    *
    * @return software/config version and uptime of the datanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 2fffffd..6aaa025 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@@ -97,6 +98,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
   private static final GetBalancerBandwidthRequestProto
       VOID_GET_BALANCER_BANDWIDTH =
       GetBalancerBandwidthRequestProto.newBuilder().build();
+  private final static EvictWritersRequestProto VOID_EVICT_WRITERS =
+      EvictWritersRequestProto.newBuilder().build();
 
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
       Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@@ -244,6 +247,15 @@ public class ClientDatanodeProtocolTranslatorPB implements
   }
 
   @Override
+  public void evictWriters() throws IOException {
+    try {
+      rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public DatanodeLocalInfo getDatanodeInfo() throws IOException {
     GetDatanodeInfoResponseProto response;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
index 954fedc..e135df8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto
@@ -114,6 +114,13 @@ message ShutdownDatanodeRequestProto {
 message ShutdownDatanodeResponseProto {
 }
 
+/** Tell datanode to evict active clients that are writing */
+message EvictWritersRequestProto {
+}
+
+message EvictWritersResponseProto {
+}
+
 /**
  * Ping datanode for liveness and quick info
  */
@@ -176,6 +183,9 @@ service ClientDatanodeProtocolService {
   rpc shutdownDatanode(ShutdownDatanodeRequestProto)
       returns(ShutdownDatanodeResponseProto);
 
+  rpc evictWriters(EvictWritersRequestProto)
+      returns(EvictWritersResponseProto);
+
   rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
       returns(GetDatanodeInfoResponseProto);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 0feecc1..e0401f7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersRequestProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.EvictWritersResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
@@ -67,6 +69,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB 
implements
       StartReconfigurationResponseProto.newBuilder().build();
   private final static TriggerBlockReportResponseProto 
TRIGGER_BLOCK_REPORT_RESP =
       TriggerBlockReportResponseProto.newBuilder().build();
+  private final static EvictWritersResponseProto EVICT_WRITERS_RESP =
+      EvictWritersResponseProto.newBuilder().build();
   
   private final ClientDatanodeProtocol impl;
 
@@ -142,6 +146,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB 
implements
     return SHUTDOWN_DATANODE_RESP;
   }
 
+  @Override
+  public EvictWritersResponseProto evictWriters(RpcController unused,
+      EvictWritersRequestProto request) throws ServiceException {
+    try {
+      impl.evictWriters();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return EVICT_WRITERS_RESP;
+  }
+
   public GetDatanodeInfoResponseProto getDatanodeInfo(RpcController unused,
       GetDatanodeInfoRequestProto request) throws ServiceException {
     GetDatanodeInfoResponseProto res;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index fb0c1c5..8f9138c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -889,6 +889,9 @@ class BlockReceiver implements Closeable {
   }
   
   public void sendOOB() throws IOException, InterruptedException {
+    if (isDatanode) {
+      return;
+    }
     ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
         .getRestartOOBStatus());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 989afbe..625eb3f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2974,6 +2974,13 @@ public class DataNode extends ReconfigurableBase
   }
 
   @Override //ClientDatanodeProtocol
+  public void evictWriters() throws IOException {
+    checkSuperuserPrivilege();
+    LOG.info("Evicting all writers.");
+    xserver.stopWriters();
+  }
+
+  @Override //ClientDatanodeProtocol
   public DatanodeLocalInfo getDatanodeInfo() {
     long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000;
     return new DatanodeLocalInfo(VersionInfo.getVersion(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 63bf5ae..d5dc328 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -116,6 +116,7 @@ class DataXceiver extends Receiver implements Runnable {
   private BlockReceiver blockReceiver = null;
   private final int ioFileBufferSize;
   private final int smallBufferSize;
+  private Thread xceiver = null;
 
   /**
    * Client Name used in previous operation. Not available on first request
@@ -178,9 +179,38 @@ class DataXceiver extends Receiver implements Runnable {
   }
 
   public void sendOOB() throws IOException, InterruptedException {
+    BlockReceiver br = getCurrentBlockReceiver();
+    if (br == null) {
+      return;
+    }
+    // This doesn't need to be in a critical section. Althogh the client
+    // can resue the connection to issue a different request, trying sending
+    // an OOB through the recently closed block receiver is harmless.
     LOG.info("Sending OOB to peer: " + peer);
-    if(blockReceiver!=null)
-      blockReceiver.sendOOB();
+    br.sendOOB();
+  }
+
+  public void stopWriter() {
+    // We want to interrupt the xceiver only when it is serving writes.
+    synchronized(this) {
+      if (getCurrentBlockReceiver() == null) {
+        return;
+      }
+      xceiver.interrupt();
+    }
+    LOG.info("Stopped the writer: " + peer);
+  }
+
+  /**
+   * blockReceiver is updated at multiple places. Use the synchronized setter
+   * and getter.
+   */
+  private synchronized void setCurrentBlockReceiver(BlockReceiver br) {
+    blockReceiver = br;
+  }
+
+  private synchronized BlockReceiver getCurrentBlockReceiver() {
+    return blockReceiver;
   }
   
   /**
@@ -192,6 +222,9 @@ class DataXceiver extends Receiver implements Runnable {
     Op op = null;
 
     try {
+      synchronized(this) {
+        xceiver = Thread.currentThread();
+      }
       dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
@@ -679,12 +712,12 @@ class DataXceiver extends Receiver implements Runnable {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
-        blockReceiver = getBlockReceiver(block, storageType, in,
+        setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist, pinning);
+            cachingStrategy, allowLazyPersist, pinning));
         replica = blockReceiver.getReplica();
       } else {
         replica = datanode.data.recoverClose(
@@ -853,7 +886,7 @@ class DataXceiver extends Receiver implements Runnable {
       IOUtils.closeStream(replyOut);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
-      blockReceiver = null;
+      setCurrentBlockReceiver(null);
     }
 
     //update metrics
@@ -1060,7 +1093,6 @@ class DataXceiver extends Receiver implements Runnable {
     DataOutputStream proxyOut = null;
     Status opStatus = SUCCESS;
     String errMsg = null;
-    BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     boolean IoeDuringCopyBlockOperation = false;
     try {
@@ -1119,11 +1151,11 @@ class DataXceiver extends Receiver implements Runnable {
         DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
             checksumInfo.getChecksum());
         // open a block receiver and check if the block does not exist
-        blockReceiver = getBlockReceiver(block, storageType,
+        setCurrentBlockReceiver(getBlockReceiver(block, storageType,
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false, false);
+            CachingStrategy.newDropBehind(), false, false));
         
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
index 10945e7..126d5b1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
@@ -256,6 +256,12 @@ class DataXceiverServer implements Runnable {
       }
     }
   }
+
+  public synchronized void stopWriters() {
+    for (Peer p : peers.keySet()) {
+      peersXceiver.get(p).stopWriter();
+    }
+  }
   
   // Notify all peers of the shutdown and restart.
   // datanode.shouldRun should still be true and datanode.restarting should

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index d84d664..a35246f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -1090,6 +1090,10 @@ public class DFSAdmin extends FsShell {
         + "\tclients will timeout and ignore the datanode. In such case, the\n"
         + "\tfast start-up mode will also be disabled.\n";
 
+    String evictWriters = "-evictWriters <datanode_host:ipc_port>\n"
+        + "\tMake the datanode evict all clients that are writing a block.\n"
+        + "\tThis is useful if decommissioning is hung due to slow writers.\n";
+
     String getDatanodeInfo = "-getDatanodeInfo <datanode_host:ipc_port>\n"
         + "\tGet the information about the given datanode. This command can\n"
         + "\tbe used for checking if a datanode is alive.\n";
@@ -1159,6 +1163,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(disallowSnapshot);
     } else if ("shutdownDatanode".equalsIgnoreCase(cmd)) {
       System.out.println(shutdownDatanode);
+    } else if ("evictWriters".equalsIgnoreCase(cmd)) {
+      System.out.println(evictWriters);
     } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
       System.out.println(getDatanodeInfo);
     } else if ("help".equals(cmd)) {
@@ -1193,6 +1199,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(allowSnapshot);
       System.out.println(disallowSnapshot);
       System.out.println(shutdownDatanode);
+      System.out.println(evictWriters);
       System.out.println(getDatanodeInfo);
       System.out.println(triggerBlockReport);
       System.out.println(help);
@@ -2047,6 +2054,8 @@ public class DFSAdmin extends FsShell {
         exitCode = fetchImage(argv, i);
       } else if ("-shutdownDatanode".equals(cmd)) {
         exitCode = shutdownDatanode(argv, i);
+      } else if ("-evictWriters".equals(cmd)) {
+        exitCode = evictWriters(argv, i);
       } else if ("-getDatanodeInfo".equals(cmd)) {
         exitCode = getDatanodeInfo(argv, i);
       } else if ("-reconfig".equals(cmd)) {
@@ -2171,6 +2180,18 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  private int evictWriters(String[] argv, int i) throws IOException {
+    final String dn = argv[i];
+    ClientDatanodeProtocol dnProxy = getDataNodeProxy(dn);
+    try {
+      dnProxy.evictWriters();
+      System.out.println("Requested writer eviction to datanode " + dn);
+    } catch (IOException ioe) {
+      return -1;
+    }
+    return 0;
+  }
+
   private int getDatanodeInfo(String[] argv, int i) throws IOException {
     ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aede8c10/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 0eeb3b7..5e320fa 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -271,6 +271,55 @@ public class TestClientProtocolForPipelineRecovery {
     }
   }
 
+  /**
+   * Test that the writer is kicked out of a node.
+   */
+  @Test
+  public void testEvictWriter() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes((int)3)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      Path file = new Path("testEvictWriter.dat");
+      FSDataOutputStream out = fs.create(file, (short)2);
+      out.write(0x31);
+      out.hflush();
+
+      // get nodes in the pipeline
+      DFSOutputStream dfsOut = (DFSOutputStream)out.getWrappedStream();
+      DatanodeInfo[] nodes = dfsOut.getPipeline();
+      Assert.assertEquals(2, nodes.length);
+      String dnAddr = nodes[1].getIpcAddr(false);
+
+      // evict the writer from the second datanode and wait until
+      // the pipeline is rebuilt.
+      DFSAdmin dfsadmin = new DFSAdmin(conf);
+      final String[] args1 = {"-evictWriters", dnAddr };
+      Assert.assertEquals(0, dfsadmin.run(args1));
+      out.write(0x31);
+      out.hflush();
+
+      // get the new pipline and check the node is not in there.
+      nodes = dfsOut.getPipeline();
+      try {
+        Assert.assertTrue(nodes.length > 0 );
+        for (int i = 0; i < nodes.length; i++) {
+          Assert.assertFalse(dnAddr.equals(nodes[i].getIpcAddr(false)));
+        }
+      } finally {
+        out.close();
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /** Test restart timeout */
   @Test
   public void testPipelineRecoveryOnRestartFailure() throws Exception {

Reply via email to