Author: vinodkv
Date: Fri Mar 21 18:36:44 2014
New Revision: 1580007
URL: http://svn.apache.org/r1580007
Log:
YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
more log-data than the log-length that it records. Contributed by Mit Desai.
svn merge --ignore-ancestry -c 1580005 ../../trunk/
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt?rev=1580007&r1=1580006&r2=1580007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt Fri Mar
21 18:36:44 2014
@@ -483,6 +483,9 @@ Release 2.4.0 - UNRELEASED
YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either
web-app proxy or the RMs when HA is enabled. (Robert Kanter via vinodkv)
+ YARN-1670. Fixed a bug in log-aggregation that can cause the writer to
write
+ more log-data than the log-length that it records. (Mit Desai via vinodk)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java?rev=1580007&r1=1580006&r2=1580007&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
Fri Mar 21 18:36:44 2014
@@ -182,20 +182,29 @@ public class AggregatedLogFormat {
Arrays.sort(logFiles);
for (File logFile : logFiles) {
+ long fileLength = 0;
+
// Write the logFile Type
out.writeUTF(logFile.getName());
// Write the log length as UTF so that it is printable
- out.writeUTF(String.valueOf(logFile.length()));
+ out.writeUTF(String.valueOf(fileLength = logFile.length()));
// Write the log itself
FileInputStream in = null;
try {
in = SecureIOUtils.openForRead(logFile, getUser(), null);
byte[] buf = new byte[65535];
+ long curRead = 0;
int len = 0;
- while ((len = in.read(buf)) != -1) {
+ while ( ((len = in.read(buf)) != -1) && (curRead < fileLength) ) {
out.write(buf, 0, len);
+ curRead += len;
+ }
+ long newLength = logFile.length();
+ if(fileLength < newLength) {
+ LOG.warn("Aggregated Logs Truncated by "+
+ (newLength-fileLength) +" bytes.");
}
} catch (IOException e) {
String message = "Error aggregating log file. Log file : "
@@ -553,7 +562,7 @@ public class AggregatedLogFormat {
out.println(fileLengthStr);
out.println("Log Contents:");
- int curRead = 0;
+ long curRead = 0;
long pendingRead = fileLength - curRead;
int toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
Modified:
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java?rev=1580007&r1=1580006&r2=1580007&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
(original)
+++
hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
Fri Mar 21 18:36:44 2014
@@ -34,6 +34,7 @@ import java.io.UnsupportedEncodingExcept
import java.io.Writer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
@@ -87,6 +88,96 @@ public class TestAggregatedLogFormat {
fs.delete(workDirPath, true);
}
+ //Test for Corrupted AggregatedLogs. The Logs should not write more data
+ //if Logvalue.write() is called and the application is still
+ //appending to logs
+
+ @Test
+ public void testForCorruptedAggregatedLogs() throws Exception {
+ Configuration conf = new Configuration();
+ File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
+ Path remoteAppLogFile =
+ new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
+ Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
+ ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
+ Path t =
+ new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
+ .getApplicationId().toString());
+ Path srcFilePath = new Path(t, testContainerId.toString());
+
+ long numChars = 950000;
+
+ writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile,
+ srcFileRoot, testContainerId);
+
+ LogReader logReader = new LogReader(conf, remoteAppLogFile);
+ LogKey rLogKey = new LogKey();
+ DataInputStream dis = logReader.next(rLogKey);
+ Writer writer = new StringWriter();
+ try {
+ LogReader.readAcontainerLogs(dis, writer);
+ } catch (Exception e) {
+ if(e.toString().contains("NumberFormatException")) {
+ Assert.fail("Aggregated logs are corrupted.");
+ }
+ }
+ }
+
+ private void writeSrcFileAndALog(Path srcFilePath, String fileName, final
long length,
+ Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId)
+ throws Exception {
+ File dir = new File(srcFilePath.toString());
+ if (!dir.exists()) {
+ if (!dir.mkdirs()) {
+ throw new IOException("Unable to create directory : " + dir);
+ }
+ }
+
+ File outputFile = new File(new File(srcFilePath.toString()), fileName);
+ FileOutputStream os = new FileOutputStream(outputFile);
+ final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
+ final int ch = filler;
+
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+
+ LogKey logKey = new LogKey(testContainerId);
+ LogValue logValue =
+ spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+ testContainerId, ugi.getShortUserName()));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ for(int i=0; i < length/3; i++) {
+ osw.write(ch);
+ }
+
+ latch.countDown();
+
+ for(int i=0; i < (2*length)/3; i++) {
+ osw.write(ch);
+ }
+ osw.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ };
+ t.start();
+
+ //Wait till the osw is partially written
+ //aggregation starts once the ows has completed 1/3rd of its work
+ latch.await();
+
+ //Aggregate The Logs
+ logWriter.append(logKey, logValue);
+ logWriter.close();
+ }
+
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
@Test
public void testReadAcontainerLogs1() throws Exception {