Repository: hadoop
Updated Branches:
  refs/heads/branch-2 54e33baaf -> 005e1df54


HDFS-7703. Support favouredNodes for the append for new blocks ( Contributed by 
Vinayakumar B)

(cherry picked from commit 89a544928083501625bc69f96b530040228f0a5f)

Conflicts:
        hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91a5d929
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91a5d929
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91a5d929

Branch: refs/heads/branch-2
Commit: 91a5d92916c6c1b0475d5794c3855b53b020d4ec
Parents: 54e33ba
Author: Vinayakumar B <vinayakum...@apache.org>
Authored: Thu Feb 12 12:38:44 2015 +0530
Committer: Vinayakumar B <vinayakum...@apache.org>
Committed: Tue Feb 17 15:19:03 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 53 +++++++++++++++-----
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  7 ++-
 .../hadoop/hdfs/DistributedFileSystem.java      | 43 ++++++++++++++++
 .../namenode/TestFavoredNodesEndToEnd.java      | 29 +++++++++++
 5 files changed, 121 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5d929/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6bfa34c..a1b2053 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -324,6 +324,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7761. cleanup unnecssary code logic in LocatedBlock. (yliu)
 
+    HDFS-7703. Support favouredNodes for the append for new blocks
+    (vinayakumarb)
+
     HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
 
     HDFS-7684. The host:port settings of the daemons should be trimmed before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5d929/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d27197f..3c0ec99 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1693,6 +1693,15 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
+    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
+        src, masked, flag, createParent, replication, blockSize, progress,
+        buffersize, dfsClientConf.createChecksum(checksumOpt),
+        getFavoredNodesStr(favoredNodes));
+    beginFileLease(result.getFileId(), result);
+    return result;
+  }
+
+  private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
     String[] favoredNodeStrs = null;
     if (favoredNodes != null) {
       favoredNodeStrs = new String[favoredNodes.length];
@@ -1702,12 +1711,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
                          + favoredNodes[i].getPort();
       }
     }
-    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
-        src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum(checksumOpt),
-        favoredNodeStrs);
-    beginFileLease(result.getFileId(), result);
-    return result;
+    return favoredNodeStrs;
   }
   
   /**
@@ -1725,7 +1729,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
         }
         return null;
       }
-      return callAppend(src, buffersize, flag, progress);
+      return callAppend(src, buffersize, flag, progress, null);
     }
     return null;
   }
@@ -1804,7 +1808,8 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
 
   /** Method to get stream returned by append call */
   private DFSOutputStream callAppend(String src, int buffersize,
-      EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
+      EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
+      throws IOException {
     CreateFlag.validateForAppend(flag);
     try {
       LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
@@ -1812,7 +1817,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       return DFSOutputStream.newStreamForAppend(this, src,
           flag.contains(CreateFlag.NEW_BLOCK),
           buffersize, progress, blkWithStatus.getLastBlock(),
-          blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
+          blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), 
favoredNodes);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
@@ -1840,14 +1845,38 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   public HdfsDataOutputStream append(final String src, final int buffersize,
       EnumSet<CreateFlag> flag, final Progressable progress,
       final FileSystem.Statistics statistics) throws IOException {
-    final DFSOutputStream out = append(src, buffersize, flag, progress);
+    final DFSOutputStream out = append(src, buffersize, flag, null, progress);
+    return createWrappedOutputStream(out, statistics, out.getInitialLen());
+  }
+
+  /**
+   * Append to an existing HDFS file.
+   * 
+   * @param src file name
+   * @param buffersize buffer size
+   * @param flag indicates whether to append data to a new block instead of the
+   *          last block
+   * @param progress for reporting write-progress; null is acceptable.
+   * @param statistics file system statistics; null is acceptable.
+   * @param favoredNodes FavoredNodes for new blocks
+   * @return an output stream for writing into the file
+   * @see ClientProtocol#append(String, String, EnumSetWritable)
+   */
+  public HdfsDataOutputStream append(final String src, final int buffersize,
+      EnumSet<CreateFlag> flag, final Progressable progress,
+      final FileSystem.Statistics statistics,
+      final InetSocketAddress[] favoredNodes) throws IOException {
+    final DFSOutputStream out = append(src, buffersize, flag,
+        getFavoredNodesStr(favoredNodes), progress);
     return createWrappedOutputStream(out, statistics, out.getInitialLen());
   }
 
   private DFSOutputStream append(String src, int buffersize,
-      EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
+      EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
+      throws IOException {
     checkOpen();
-    final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
+    final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
+        favoredNodes);
     beginFileLease(result.getFileId(), result);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5d929/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 47b6f36..3ed957b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1822,10 +1822,13 @@ public class DFSOutputStream extends FSOutputSummer
 
   static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
       boolean toNewBlock, int bufferSize, Progressable progress,
-      LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)
-      throws IOException {
+      LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
+      String[] favoredNodes) throws IOException {
     final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
         progress, lastBlock, stat, checksum);
+    if (favoredNodes != null && favoredNodes.length != 0) {
+      out.streamer.setFavoredNodes(favoredNodes);
+    }
     out.start();
     return out;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5d929/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 2cecdfb..136ef15 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -317,6 +317,17 @@ public class DistributedFileSystem extends FileSystem {
     return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
   }
 
+  /**
+   * Append to an existing file (optional operation).
+   * 
+   * @param f the existing file to be appended.
+   * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+   *          to be present.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @return Returns instance of {@link FSDataOutputStream}
+   * @throws IOException
+   */
   public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
       final int bufferSize, final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
@@ -336,6 +347,38 @@ public class DistributedFileSystem extends FileSystem {
     }.resolve(this, absF);
   }
 
+  /**
+   * Append to an existing file (optional operation).
+   * 
+   * @param f the existing file to be appended.
+   * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+   *          to be present.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @param favoredNodes Favored nodes for new blocks
+   * @return Returns instance of {@link FSDataOutputStream}
+   * @throws IOException
+   */
+  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final Progressable progress,
+      final InetSocketAddress[] favoredNodes) throws IOException {
+    statistics.incrementWriteOps(1);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FSDataOutputStream>() {
+      @Override
+      public FSDataOutputStream doCall(final Path p)
+          throws IOException {
+        return dfs.append(getPathName(p), bufferSize, flag, progress,
+            statistics, favoredNodes);
+      }
+      @Override
+      public FSDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.append(p, bufferSize);
+      }
+    }.resolve(this, absF);
+  }
+
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5d929/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index 4f11037..2d39896 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -26,12 +26,14 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -159,6 +161,33 @@ public class TestFavoredNodesEndToEnd {
     }
   }
 
+  @Test(timeout = 180000)
+  public void testFavoredNodesEndToEndForAppend() throws Exception {
+    // create 10 files with random preferred nodes
+    for (int i = 0; i < NUM_FILES; i++) {
+      Random rand = new Random(System.currentTimeMillis() + i);
+      // pass a new created rand so as to get a uniform distribution each time
+      // without too much collisions (look at the do-while loop in 
getDatanodes)
+      InetSocketAddress datanode[] = getDatanodes(rand);
+      Path p = new Path("/filename" + i);
+      // create and close the file.
+      dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L,
+          null, null).close();
+      // re-open for append
+      FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND),
+          4096, null, datanode);
+      out.write(SOME_BYTES);
+      out.close();
+      BlockLocation[] locations = getBlockLocations(p);
+      // verify the files got created in the right nodes
+      for (BlockLocation loc : locations) {
+        String[] hosts = loc.getNames();
+        String[] hosts1 = getStringForInetSocketAddrs(datanode);
+        assertTrue(compareNodes(hosts, hosts1));
+      }
+    }
+  }
+
   private BlockLocation[] getBlockLocations(Path p) throws Exception {
     DFSTestUtil.waitReplication(dfs, p, (short)3);
     BlockLocation[] locations = dfs.getClient().getBlockLocations(

Reply via email to