This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f55a2d6  HADOOP-16049. DistCp result has data and checksum mismatch 
when blocks per chunk > 0.
f55a2d6 is described below

commit f55a2d6f742f43071ccb3c19e1444a076c706a2e
Author: Kai Xie <gigi...@gmail.com>
AuthorDate: Sun Jan 27 16:58:12 2019 +0000

    HADOOP-16049. DistCp result has data and checksum mismatch when blocks per 
chunk > 0.
    
    Contributed by Kai Xie.
    
    (cherry picked from commit 6d3e7a8570ce22f1adcce0b9cef6959c273d6ba7)
---
 .../tools/mapred/RetriableFileCopyCommand.java     |  24 +++--
 .../hadoop/tools/util/ThrottledInputStream.java    |  48 +++++-----
 .../org/apache/hadoop/tools/TestDistCpSync.java    |  49 ++++++++--
 .../hadoop/tools/TestDistCpSyncReverseBase.java    | 102 +++++++++++++--------
 .../apache/hadoop/tools/mapred/TestCopyMapper.java |  24 ++++-
 5 files changed, 170 insertions(+), 77 deletions(-)

diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index ddf2725..7d7ebd4 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -257,7 +257,8 @@ public class RetriableFileCopyCommand extends 
RetriableCommand {
     boolean finished = false;
     try {
       inStream = getInputStream(source, context.getConfiguration());
-      int bytesRead = readBytes(inStream, buf, sourceOffset);
+      seekIfRequired(inStream, sourceOffset);
+      int bytesRead = readBytes(inStream, buf);
       while (bytesRead >= 0) {
         if (chunkLength > 0 &&
             (totalBytesRead + bytesRead) >= chunkLength) {
@@ -273,7 +274,7 @@ public class RetriableFileCopyCommand extends 
RetriableCommand {
         if (finished) {
           break;
         }
-        bytesRead = readBytes(inStream, buf, sourceOffset);
+        bytesRead = readBytes(inStream, buf);
       }
       outStream.close();
       outStream = null;
@@ -296,13 +297,20 @@ public class RetriableFileCopyCommand extends 
RetriableCommand {
     context.setStatus(message.toString());
   }
 
-  private static int readBytes(ThrottledInputStream inStream, byte buf[],
-      long position) throws IOException {
+  private static int readBytes(ThrottledInputStream inStream, byte[] buf)
+      throws IOException {
+    try {
+      return inStream.read(buf);
+    } catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static void seekIfRequired(ThrottledInputStream inStream,
+                                     long sourceOffset) throws IOException {
     try {
-      if (position == 0) {
-        return inStream.read(buf);
-      } else {
-        return inStream.read(position, buf, 0, buf.length);
+      if (sourceOffset != inStream.getPos()) {
+        inStream.seek(sourceOffset);
       }
     } catch (IOException e) {
       throw new CopyReadException(e);
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
index a0fa0c8..19fcb0a 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.tools.util;
 
-import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,7 +33,7 @@ import java.io.InputStream;
  * (Thus, while the read-rate might exceed the maximum for a given short 
interval,
  * the average tends towards the specified maximum, overall.)
  */
-public class ThrottledInputStream extends InputStream {
+public class ThrottledInputStream extends InputStream implements Seekable {
 
   private final InputStream rawStream;
   private final long maxBytesPerSec;
@@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream {
     return readLen;
   }
 
-  /**
-   * Read bytes starting from the specified position. This requires rawStream 
is
-   * an instance of {@link PositionedReadable}.
-   */
-  public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-    if (!(rawStream instanceof PositionedReadable)) {
-      throw new UnsupportedOperationException(
-          "positioned read is not supported by the internal stream");
-    }
-    throttle();
-    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
-        offset, length);
-    if (readLen != -1) {
-      bytesRead += readLen;
-    }
-    return readLen;
-  }
-
   private void throttle() throws IOException {
     while (getBytesPerSec() > maxBytesPerSec) {
       try {
@@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream {
         ", totalSleepTime=" + totalSleepTime +
         '}';
   }
+
+  private void checkSeekable() throws IOException {
+    if (!(rawStream instanceof Seekable)) {
+      throw new UnsupportedOperationException(
+          "seek operations are unsupported by the internal stream");
+    }
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    checkSeekable();
+    ((Seekable) rawStream).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    checkSeekable();
+    return ((Seekable) rawStream).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    checkSeekable();
+    return ((Seekable) rawStream).seekToNewSource(targetPos);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
index 94e8604..71ee11b 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.tools;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -35,8 +38,10 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -44,20 +49,35 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TestDistCpSync {
-  private MiniDFSCluster cluster;
-  private final Configuration conf = new HdfsConfiguration();
+  private static MiniDFSCluster cluster;
+  private static final short DATA_NUM = 1;
+  private static final Configuration DFS_CONF = new HdfsConfiguration();
+
+  private Configuration conf;
   private DistributedFileSystem dfs;
   private DistCpOptions options;
   private final Path source = new Path("/source");
   private final Path target = new Path("/target");
   private final long BLOCK_SIZE = 1024;
-  private final short DATA_NUM = 1;
 
-  @Before
-  public void setUp() throws Exception {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = new MiniDFSCluster.Builder(DFS_CONF)
+        .numDataNodes(DATA_NUM)
+        .build();
     cluster.waitActive();
+  }
 
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void init() throws Exception {
+    conf = new HdfsConfiguration(DFS_CONF);
     dfs = cluster.getFileSystem();
     dfs.mkdirs(source);
     dfs.mkdirs(target);
@@ -72,11 +92,20 @@ public class TestDistCpSync {
   }
 
   @After
-  public void tearDown() throws Exception {
-    IOUtils.cleanup(null, dfs);
-    if (cluster != null) {
-      cluster.shutdown();
+  public void cleanup() throws Exception {
+    SnapshotManager snapshotManager = cluster
+        .getNameNode()
+        .getNamesystem()
+        .getSnapshotManager();
+    for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) {
+      dfs.deleteSnapshot(new Path(
+          StringUtils.substringBefore(
+              snapshot.getSnapshotDirectory(), ".snapshot")),
+          snapshot.getSnapshotID());
     }
+    dfs.delete(source, true);
+    dfs.delete(target, true);
+    IOUtils.cleanup(null, dfs);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java
index fea374e..e9f7e0c 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.tools;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsShell;
@@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -34,8 +37,10 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -53,15 +58,17 @@ import java.util.StringTokenizer;
  * Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt"
  */
 public abstract class TestDistCpSyncReverseBase {
-  private MiniDFSCluster cluster;
-  private final Configuration conf = new HdfsConfiguration();
+  private static MiniDFSCluster cluster;
+  private static final short DATA_NUM = 1;
+  private static final Configuration DFS_CONF = new HdfsConfiguration();
+
+  private Configuration conf;
   private DistributedFileSystem dfs;
   private DistCpOptions options;
   private Path source;
   private boolean isSrcNotSameAsTgt = true;
   private final Path target = new Path("/target");
   private final long blockSize = 1024;
-  private final short dataNum = 1;
 
   abstract void initSourcePath();
 
@@ -126,13 +133,25 @@ public abstract class TestDistCpSyncReverseBase {
     isSrcNotSameAsTgt = srcNotSameAsTgt;
   }
 
-  @Before
-  public void setUp() throws Exception {
-    initSourcePath();
-
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build();
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = new MiniDFSCluster.Builder(DFS_CONF)
+        .numDataNodes(DATA_NUM)
+        .build();
     cluster.waitActive();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
 
+  @Before
+  public void init() throws Exception {
+    initSourcePath();
+    conf = new HdfsConfiguration(DFS_CONF);
     dfs = cluster.getFileSystem();
     if (isSrcNotSameAsTgt) {
       dfs.mkdirs(source);
@@ -149,11 +168,20 @@ public abstract class TestDistCpSyncReverseBase {
   }
 
   @After
-  public void tearDown() throws Exception {
-    IOUtils.cleanup(null, dfs);
-    if (cluster != null) {
-      cluster.shutdown();
+  public void cleanup() throws Exception {
+    SnapshotManager snapshotManager = cluster
+        .getNameNode()
+        .getNamesystem()
+        .getSnapshotManager();
+    for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) {
+      dfs.deleteSnapshot(new Path(
+              StringUtils.substringBefore(
+                  snapshot.getSnapshotDirectory(), ".snapshot")),
+          snapshot.getSnapshotID());
     }
+    dfs.delete(source, true);
+    dfs.delete(target, true);
+    IOUtils.cleanup(null, dfs);
   }
 
   /**
@@ -254,10 +282,10 @@ public abstract class TestDistCpSyncReverseBase {
     final Path f3 = new Path(d1, "f3");
     final Path f4 = new Path(d2, "f4");
 
-    DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0);
-    DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0);
-    DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0);
-    DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0);
+    DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 0);
+    DFSTestUtil.createFile(dfs, f4, blockSize, DATA_NUM, 0);
   }
 
   /**
@@ -297,7 +325,7 @@ public abstract class TestDistCpSyncReverseBase {
     final Path f1 = new Path(newfoo, "f1");
     dfs.delete(f1, true);
     numDeletedModified += 1; // delete ./foo/f1
-    DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0);
+    DFSTestUtil.createFile(dfs, f1, 2 * blockSize, DATA_NUM, 0);
     DFSTestUtil.appendFile(dfs, f2, (int) blockSize);
     numDeletedModified += 1; // modify ./bar/f2
     dfs.rename(bar, new Path(dir, "foo"));
@@ -451,9 +479,9 @@ public abstract class TestDistCpSyncReverseBase {
     final Path f2 = new Path(foo, "f2");
     final Path f3 = new Path(bar, "f3");
 
-    DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L);
-    DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L);
+    DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 1L);
+    DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 2L);
   }
 
   private void changeData2(Path dir) throws Exception {
@@ -495,9 +523,9 @@ public abstract class TestDistCpSyncReverseBase {
     final Path f2 = new Path(foo, "file");
     final Path f3 = new Path(bar, "file");
 
-    DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L);
-    DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L);
+    DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, blockSize * 2, DATA_NUM, 1L);
+    DFSTestUtil.createFile(dfs, f3, blockSize * 3, DATA_NUM, 2L);
   }
 
   private void changeData3(Path dir) throws Exception {
@@ -543,7 +571,7 @@ public abstract class TestDistCpSyncReverseBase {
     final Path d2 = new Path(d1, "d2");
     final Path f1 = new Path(d2, "f1");
 
-    DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
   }
 
   private int changeData4(Path dir) throws Exception {
@@ -594,8 +622,8 @@ public abstract class TestDistCpSyncReverseBase {
     final Path f1 = new Path(d1, "f1");
     final Path f2 = new Path(d2, "f2");
 
-    DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0L);
   }
 
   private int changeData5(Path dir) throws Exception {
@@ -694,8 +722,8 @@ public abstract class TestDistCpSyncReverseBase {
     final Path foo_f1 = new Path(foo, "f1");
     final Path bar_f1 = new Path(bar, "f1");
 
-    DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
   }
 
   private int changeData6(Path dir) throws Exception {
@@ -736,8 +764,8 @@ public abstract class TestDistCpSyncReverseBase {
     final Path foo_f1 = new Path(foo, "f1");
     final Path bar_f1 = new Path(bar, "f1");
 
-    DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
   }
 
   private int changeData7(Path dir) throws Exception {
@@ -750,7 +778,7 @@ public abstract class TestDistCpSyncReverseBase {
 
     int numDeletedAndModified = 0;
     dfs.rename(foo, foo2);
-    DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
     DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
     dfs.rename(foo_f1, foo2_f2);
     /*
@@ -763,7 +791,7 @@ M       ./foo
 +       ./foo/f2
      */
     numDeletedAndModified += 1; // "M ./foo"
-    DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, DATA_NUM, 0L);
     return numDeletedAndModified;
   }
 
@@ -793,9 +821,9 @@ M       ./foo
     final Path bar_f1 = new Path(bar, "f1");
     final Path d1_f1 = new Path(d1, "f1");
 
-    DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, d1_f1, blockSize, DATA_NUM, 0L);
   }
 
   private int changeData8(Path dir, boolean createMiddleSnapshot)
@@ -813,8 +841,8 @@ M       ./foo
     final Path bar1 = new Path(dir, "bar1");
 
     int numDeletedAndModified = 0;
-    DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L);
-    DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L);
+    DFSTestUtil.createFile(dfs, foo_f3, blockSize, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, DATA_NUM, 0L);
     dfs.rename(createdDir_f1, foo_f4);
     dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
     numDeletedAndModified += 1; // modify ./c/foo/d1
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
index 9f7a1e0..c10e9b9 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.CopyListingFileStatus;
@@ -55,6 +56,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
 public class TestCopyMapper {
   private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
   private static List<Path> pathList = new ArrayList<Path>();
@@ -248,7 +253,11 @@ public class TestCopyMapper {
 
     // do the distcp again with -update and -append option
     CopyMapper copyMapper = new CopyMapper();
-    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Configuration conf = getConfiguration();
+    // set the buffer size to 1/10th the size of the file.
+    conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
+        DEFAULT_FILE_SIZE/10);
+    StubContext stubContext = new StubContext(conf, null, 0);
     Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
         stubContext.getContext();
     // Enable append 
@@ -257,6 +266,10 @@ public class TestCopyMapper {
     copyMapper.setup(context);
 
     int numFiles = 0;
+    MetricsRecordBuilder rb =
+        getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+    String readCounter = "ReadsFromLocalClient";
+    long readsFromClient = getLongCounter(readCounter, rb);
     for (Path path: pathList) {
       if (fs.getFileStatus(path).isFile()) {
         numFiles++;
@@ -274,6 +287,15 @@ public class TestCopyMapper {
         .getValue());
     Assert.assertEquals(numFiles, stubContext.getReporter().
         getCounter(CopyMapper.Counter.COPY).getValue());
+    rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+    /*
+     * added as part of HADOOP-15292 to ensure that multiple readBlock()
+     * operations are not performed to read a block from a single Datanode.
+     * assert assumes that there is only one block per file, and that the 
number
+     * of files appended to in appendSourceData() above is captured by the
+     * variable numFiles.
+     */
+    assertCounter(readCounter, readsFromClient + numFiles, rb);
   }
 
   private void testCopy(boolean preserveChecksum) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to