This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new f55a2d6 HADOOP-16049. DistCp result has data and checksum mismatch when blocks per chunk > 0. f55a2d6 is described below commit f55a2d6f742f43071ccb3c19e1444a076c706a2e Author: Kai Xie <gigi...@gmail.com> AuthorDate: Sun Jan 27 16:58:12 2019 +0000 HADOOP-16049. DistCp result has data and checksum mismatch when blocks per chunk > 0. Contributed by Kai Xie. (cherry picked from commit 6d3e7a8570ce22f1adcce0b9cef6959c273d6ba7) --- .../tools/mapred/RetriableFileCopyCommand.java | 24 +++-- .../hadoop/tools/util/ThrottledInputStream.java | 48 +++++----- .../org/apache/hadoop/tools/TestDistCpSync.java | 49 ++++++++-- .../hadoop/tools/TestDistCpSyncReverseBase.java | 102 +++++++++++++-------- .../apache/hadoop/tools/mapred/TestCopyMapper.java | 24 ++++- 5 files changed, 170 insertions(+), 77 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index ddf2725..7d7ebd4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -257,7 +257,8 @@ public class RetriableFileCopyCommand extends RetriableCommand { boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); - int bytesRead = readBytes(inStream, buf, sourceOffset); + seekIfRequired(inStream, sourceOffset); + int bytesRead = readBytes(inStream, buf); while (bytesRead >= 0) { if (chunkLength > 0 && (totalBytesRead + bytesRead) >= chunkLength) { @@ -273,7 +274,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { if (finished) { break; } - bytesRead = readBytes(inStream, buf, sourceOffset); + bytesRead = readBytes(inStream, buf); } outStream.close(); outStream = null; @@ -296,13 +297,20 @@ public class RetriableFileCopyCommand extends RetriableCommand { context.setStatus(message.toString()); } - private static int readBytes(ThrottledInputStream inStream, byte buf[], - long position) throws IOException { + private static int readBytes(ThrottledInputStream inStream, byte[] buf) + throws IOException { + try { + return inStream.read(buf); + } catch (IOException e) { + throw new CopyReadException(e); + } + } + + private static void seekIfRequired(ThrottledInputStream inStream, + long sourceOffset) throws IOException { try { - if (position == 0) { - return inStream.read(buf); - } else { - return inStream.read(position, buf, 0, buf.length); + if (sourceOffset != inStream.getPos()) { + inStream.seek(sourceOffset); } } catch (IOException e) { throw new CopyReadException(e); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index a0fa0c8..19fcb0a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -18,7 +18,7 @@ package org.apache.hadoop.tools.util; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import java.io.IOException; import java.io.InputStream; @@ -33,7 +33,7 @@ import java.io.InputStream; * (Thus, while the read-rate might exceed the maximum for a given short interval, * the average tends towards the specified maximum, overall.) */ -public class ThrottledInputStream extends InputStream { +public class ThrottledInputStream extends InputStream implements Seekable { private final InputStream rawStream; private final long maxBytesPerSec; @@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream { return readLen; } - /** - * Read bytes starting from the specified position. This requires rawStream is - * an instance of {@link PositionedReadable}. - */ - public int read(long position, byte[] buffer, int offset, int length) - throws IOException { - if (!(rawStream instanceof PositionedReadable)) { - throw new UnsupportedOperationException( - "positioned read is not supported by the internal stream"); - } - throttle(); - int readLen = ((PositionedReadable) rawStream).read(position, buffer, - offset, length); - if (readLen != -1) { - bytesRead += readLen; - } - return readLen; - } - private void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSec) { try { @@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream { ", totalSleepTime=" + totalSleepTime + '}'; } + + private void checkSeekable() throws IOException { + if (!(rawStream instanceof Seekable)) { + throw new UnsupportedOperationException( + "seek operations are unsupported by the internal stream"); + } + } + + @Override + public void seek(long pos) throws IOException { + checkSeekable(); + ((Seekable) rawStream).seek(pos); + } + + @Override + public long getPos() throws IOException { + checkSeekable(); + return ((Seekable) rawStream).getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkSeekable(); + return ((Seekable) rawStream).seekToNewSource(targetPos); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index 94e8604..71ee11b 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.tools; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotInfo; +import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -35,8 +38,10 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.mapred.CopyMapper; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.util.Arrays; @@ -44,20 +49,35 @@ import java.util.HashMap; import java.util.Map; public class TestDistCpSync { - private MiniDFSCluster cluster; - private final Configuration conf = new HdfsConfiguration(); + private static MiniDFSCluster cluster; + private static final short DATA_NUM = 1; + private static final Configuration DFS_CONF = new HdfsConfiguration(); + + private Configuration conf; private DistributedFileSystem dfs; private DistCpOptions options; private final Path source = new Path("/source"); private final Path target = new Path("/target"); private final long BLOCK_SIZE = 1024; - private final short DATA_NUM = 1; - @Before - public void setUp() throws Exception { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build(); + @BeforeClass + public static void setUp() throws Exception { + cluster = new MiniDFSCluster.Builder(DFS_CONF) + .numDataNodes(DATA_NUM) + .build(); cluster.waitActive(); + } + @AfterClass + public static void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void init() throws Exception { + conf = new HdfsConfiguration(DFS_CONF); dfs = cluster.getFileSystem(); dfs.mkdirs(source); dfs.mkdirs(target); @@ -72,11 +92,20 @@ public class TestDistCpSync { } @After - public void tearDown() throws Exception { - IOUtils.cleanup(null, dfs); - if (cluster != null) { - cluster.shutdown(); + public void cleanup() throws Exception { + SnapshotManager snapshotManager = cluster + .getNameNode() + .getNamesystem() + .getSnapshotManager(); + for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) { + dfs.deleteSnapshot(new Path( + StringUtils.substringBefore( + snapshot.getSnapshotDirectory(), ".snapshot")), + snapshot.getSnapshotID()); } + dfs.delete(source, true); + dfs.delete(target, true); + IOUtils.cleanup(null, dfs); } /** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java index fea374e..e9f7e0c 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSyncReverseBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.tools; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsShell; @@ -27,6 +28,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotInfo; +import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -34,8 +37,10 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.tools.mapred.CopyMapper; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.ByteArrayOutputStream; @@ -53,15 +58,17 @@ import java.util.StringTokenizer; * Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt" */ public abstract class TestDistCpSyncReverseBase { - private MiniDFSCluster cluster; - private final Configuration conf = new HdfsConfiguration(); + private static MiniDFSCluster cluster; + private static final short DATA_NUM = 1; + private static final Configuration DFS_CONF = new HdfsConfiguration(); + + private Configuration conf; private DistributedFileSystem dfs; private DistCpOptions options; private Path source; private boolean isSrcNotSameAsTgt = true; private final Path target = new Path("/target"); private final long blockSize = 1024; - private final short dataNum = 1; abstract void initSourcePath(); @@ -126,13 +133,25 @@ public abstract class TestDistCpSyncReverseBase { isSrcNotSameAsTgt = srcNotSameAsTgt; } - @Before - public void setUp() throws Exception { - initSourcePath(); - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build(); + @BeforeClass + public static void setUp() throws Exception { + cluster = new MiniDFSCluster.Builder(DFS_CONF) + .numDataNodes(DATA_NUM) + .build(); cluster.waitActive(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + @Before + public void init() throws Exception { + initSourcePath(); + conf = new HdfsConfiguration(DFS_CONF); dfs = cluster.getFileSystem(); if (isSrcNotSameAsTgt) { dfs.mkdirs(source); @@ -149,11 +168,20 @@ public abstract class TestDistCpSyncReverseBase { } @After - public void tearDown() throws Exception { - IOUtils.cleanup(null, dfs); - if (cluster != null) { - cluster.shutdown(); + public void cleanup() throws Exception { + SnapshotManager snapshotManager = cluster + .getNameNode() + .getNamesystem() + .getSnapshotManager(); + for (SnapshotInfo.Bean snapshot : snapshotManager.getSnapshots()) { + dfs.deleteSnapshot(new Path( + StringUtils.substringBefore( + snapshot.getSnapshotDirectory(), ".snapshot")), + snapshot.getSnapshotID()); } + dfs.delete(source, true); + dfs.delete(target, true); + IOUtils.cleanup(null, dfs); } /** @@ -254,10 +282,10 @@ public abstract class TestDistCpSyncReverseBase { final Path f3 = new Path(d1, "f3"); final Path f4 = new Path(d2, "f4"); - DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0); - DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0); - DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0); - DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0); + DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f4, blockSize, DATA_NUM, 0); } /** @@ -297,7 +325,7 @@ public abstract class TestDistCpSyncReverseBase { final Path f1 = new Path(newfoo, "f1"); dfs.delete(f1, true); numDeletedModified += 1; // delete ./foo/f1 - DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0); + DFSTestUtil.createFile(dfs, f1, 2 * blockSize, DATA_NUM, 0); DFSTestUtil.appendFile(dfs, f2, (int) blockSize); numDeletedModified += 1; // modify ./bar/f2 dfs.rename(bar, new Path(dir, "foo")); @@ -451,9 +479,9 @@ public abstract class TestDistCpSyncReverseBase { final Path f2 = new Path(foo, "f2"); final Path f3 = new Path(bar, "f3"); - DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L); - DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L); + DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 1L); + DFSTestUtil.createFile(dfs, f3, blockSize, DATA_NUM, 2L); } private void changeData2(Path dir) throws Exception { @@ -495,9 +523,9 @@ public abstract class TestDistCpSyncReverseBase { final Path f2 = new Path(foo, "file"); final Path f3 = new Path(bar, "file"); - DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L); - DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L); + DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize * 2, DATA_NUM, 1L); + DFSTestUtil.createFile(dfs, f3, blockSize * 3, DATA_NUM, 2L); } private void changeData3(Path dir) throws Exception { @@ -543,7 +571,7 @@ public abstract class TestDistCpSyncReverseBase { final Path d2 = new Path(d1, "d2"); final Path f1 = new Path(d2, "f1"); - DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L); } private int changeData4(Path dir) throws Exception { @@ -594,8 +622,8 @@ public abstract class TestDistCpSyncReverseBase { final Path f1 = new Path(d1, "f1"); final Path f2 = new Path(d2, "f2"); - DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, blockSize, DATA_NUM, 0L); } private int changeData5(Path dir) throws Exception { @@ -694,8 +722,8 @@ public abstract class TestDistCpSyncReverseBase { final Path foo_f1 = new Path(foo, "f1"); final Path bar_f1 = new Path(bar, "f1"); - DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L); } private int changeData6(Path dir) throws Exception { @@ -736,8 +764,8 @@ public abstract class TestDistCpSyncReverseBase { final Path foo_f1 = new Path(foo, "f1"); final Path bar_f1 = new Path(bar, "f1"); - DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L); } private int changeData7(Path dir) throws Exception { @@ -750,7 +778,7 @@ public abstract class TestDistCpSyncReverseBase { int numDeletedAndModified = 0; dfs.rename(foo, foo2); - DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L); DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize); dfs.rename(foo_f1, foo2_f2); /* @@ -763,7 +791,7 @@ M ./foo + ./foo/f2 */ numDeletedAndModified += 1; // "M ./foo" - DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, DATA_NUM, 0L); return numDeletedAndModified; } @@ -793,9 +821,9 @@ M ./foo final Path bar_f1 = new Path(bar, "f1"); final Path d1_f1 = new Path(d1, "f1"); - DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, bar_f1, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, d1_f1, blockSize, DATA_NUM, 0L); } private int changeData8(Path dir, boolean createMiddleSnapshot) @@ -813,8 +841,8 @@ M ./foo final Path bar1 = new Path(dir, "bar1"); int numDeletedAndModified = 0; - DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L); - DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L); + DFSTestUtil.createFile(dfs, foo_f3, blockSize, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, DATA_NUM, 0L); dfs.rename(createdDir_f1, foo_f4); dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1 numDeletedAndModified += 1; // modify ./c/foo/d1 diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 9f7a1e0..c10e9b9 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.CopyListingFileStatus; @@ -55,6 +56,10 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List<Path> pathList = new ArrayList<Path>(); @@ -248,7 +253,11 @@ public class TestCopyMapper { // do the distcp again with -update and -append option CopyMapper copyMapper = new CopyMapper(); - StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Configuration conf = getConfiguration(); + // set the buffer size to 1/10th the size of the file. + conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(), + DEFAULT_FILE_SIZE/10); + StubContext stubContext = new StubContext(conf, null, 0); Mapper<Text, CopyListingFileStatus, Text, Text>.Context context = stubContext.getContext(); // Enable append @@ -257,6 +266,10 @@ public class TestCopyMapper { copyMapper.setup(context); int numFiles = 0; + MetricsRecordBuilder rb = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + String readCounter = "ReadsFromLocalClient"; + long readsFromClient = getLongCounter(readCounter, rb); for (Path path: pathList) { if (fs.getFileStatus(path).isFile()) { numFiles++; @@ -274,6 +287,15 @@ public class TestCopyMapper { .getValue()); Assert.assertEquals(numFiles, stubContext.getReporter(). getCounter(CopyMapper.Counter.COPY).getValue()); + rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + /* + * added as part of HADOOP-15292 to ensure that multiple readBlock() + * operations are not performed to read a block from a single Datanode. + * assert assumes that there is only one block per file, and that the number + * of files appended to in appendSourceData() above is captured by the + * variable numFiles. + */ + assertCounter(readCounter, readsFromClient + numFiles, rb); } private void testCopy(boolean preserveChecksum) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org