Repository: tez
Updated Branches:
  refs/heads/master 8dcf8a121 -> 1061cf5c3


TEZ-3813. Reduce Object size of MemoryFetchedInput for large jobs (Muhammad 
Samir Khan via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1061cf5c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1061cf5c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1061cf5c

Branch: refs/heads/master
Commit: 1061cf5c310f954ffbf96a3295325b5e2eeb5d9f
Parents: 8dcf8a1
Author: Jonathan Eagles <jeag...@yahoo-inc.com>
Authored: Wed Aug 9 17:40:09 2017 -0500
Committer: Jonathan Eagles <jeag...@yahoo-inc.com>
Committed: Wed Aug 9 17:40:09 2017 -0500

----------------------------------------------------------------------
 tez-runtime-library/findbugs-exclude.xml        |  6 +++
 .../common/readers/UnorderedKVReader.java       |  4 +-
 .../common/shuffle/DiskFetchedInput.java        | 44 +++++++++------
 .../library/common/shuffle/FetchedInput.java    | 56 +++++++++++--------
 .../runtime/library/common/shuffle/Fetcher.java |  2 +-
 .../common/shuffle/LocalDiskFetchedInput.java   | 43 +++++++++------
 .../common/shuffle/MemoryFetchedInput.java      | 57 +++++++++++++-------
 .../common/shuffle/impl/ShuffleManager.java     | 12 ++++-
 .../impl/SimpleFetchedInputAllocator.java       |  8 +--
 .../common/readers/TestUnorderedKVReader.java   |  2 +-
 .../library/common/shuffle/TestFetcher.java     |  3 +-
 .../common/shuffle/impl/TestShuffleManager.java | 12 ++++-
 12 files changed, 166 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml 
b/tez-runtime-library/findbugs-exclude.xml
index 24fe93e..9e55221 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -207,4 +207,10 @@
     <Field name="MAX_BUFFER_SIZE"/>
     <Bug pattern="MS_SHOULD_BE_FINAL"/>
   </Match>
+
+  <Match>
+    <Class 
name="org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput"/>
+    <Method name="getBytes"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index f4400db..60f70ac 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -221,10 +221,10 @@ public class UnorderedKVReader<K, V> extends 
KeyValueReader {
       MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
 
       return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
-          mfi.getBytes(), 0, (int) mfi.getActualSize());
+          mfi.getBytes(), 0, (int) mfi.getSize());
     } else {
       return new IFile.Reader(fetchedInput.getInputStream(),
-          fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead,
+          fetchedInput.getSize(), codec, null, null, ifileReadAhead,
           ifileReadAheadLength, ifileBufferSize);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
index c873af7..5d1c037 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java
@@ -40,21 +40,33 @@ public class DiskFetchedInput extends FetchedInput {
   private final FileSystem localFS;
   private final Path tmpOutputPath;
   private final Path outputPath;
+  private final long size;
 
-  public DiskFetchedInput(long actualSize, long compressedSize,
+  public DiskFetchedInput(long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler, Configuration conf,
       LocalDirAllocator localDirAllocator, TezTaskOutputFiles 
filenameAllocator)
       throws IOException {
-    super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, 
callbackHandler);
+    super(inputAttemptIdentifier, callbackHandler);
 
+    this.size = compressedSize;
     this.localFS = FileSystem.getLocal(conf).getRaw();
     this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier(), this
-            .inputAttemptIdentifier.getSpillEventId(), actualSize);
+        this.getInputAttemptIdentifier().getInputIdentifier(), this
+            .getInputAttemptIdentifier().getSpillEventId(), this.size);
     // Files are not clobbered due to the id being appended to the outputPath 
in the tmpPath,
     // otherwise fetches for the same task but from different attempts would 
clobber each other.
-    this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
+    this.tmpOutputPath = outputPath.suffix(String.valueOf(getId()));
+  }
+
+  @Override
+  public Type getType() {
+    return Type.DISK;
+  }
+
+  @Override
+  public long getSize() {
+    return size;
   }
 
   @Override
@@ -68,7 +80,7 @@ public class DiskFetchedInput extends FetchedInput {
   }
 
   public final Path getInputPath() {
-    if (state == State.COMMITTED) {
+    if (isState(State.COMMITTED)) {
       return this.outputPath;
     }
     return this.tmpOutputPath;
@@ -76,8 +88,8 @@ public class DiskFetchedInput extends FetchedInput {
   
   @Override
   public void commit() throws IOException {
-    if (state == State.PENDING) {
-      state = State.COMMITTED;
+    if (isState(State.PENDING)) {
+      setState(State.COMMITTED);
       localFS.rename(tmpOutputPath, outputPath);
       notifyFetchComplete();
     }
@@ -85,8 +97,8 @@ public class DiskFetchedInput extends FetchedInput {
 
   @Override
   public void abort() throws IOException {
-    if (state == State.PENDING) {
-      state = State.ABORTED;
+    if (isState(State.PENDING)) {
+      setState(State.ABORTED);
       // TODO NEWTEZ Maybe defer this to container cleanup
       localFS.delete(tmpOutputPath, false);
       notifyFetchFailure();
@@ -96,10 +108,10 @@ public class DiskFetchedInput extends FetchedInput {
   @Override
   public void free() {
     Preconditions.checkState(
-        state == State.COMMITTED || state == State.ABORTED,
+        isState(State.COMMITTED) || isState(State.ABORTED),
         "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) {
-      state = State.FREED;
+    if (isState(State.COMMITTED)) {
+      setState(State.FREED);
       try {
         // TODO NEWTEZ Maybe defer this to container cleanup
         localFS.delete(outputPath, false);
@@ -115,8 +127,8 @@ public class DiskFetchedInput extends FetchedInput {
   @Override
   public String toString() {
     return "DiskFetchedInput [outputPath=" + outputPath
-        + ", inputAttemptIdentifier=" + inputAttemptIdentifier
-        + ", actualSize=" + actualSize + ",compressedSize=" + compressedSize
-        + ", type=" + type + ", id=" + id + ", state=" + state + "]";
+        + ", inputAttemptIdentifier=" + getInputAttemptIdentifier()
+        + ", actualSize=" + getSize()
+        + ", type=" + getType() + ", id=" + getId() + ", state=" + getState() 
+ "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java
index 3e740a0..8982c27 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java
@@ -42,38 +42,52 @@ public abstract class FetchedInput {
 
   private static AtomicInteger ID_GEN = new AtomicInteger(0);
 
-  protected InputAttemptIdentifier inputAttemptIdentifier;
-  protected final long actualSize;
-  protected final long compressedSize;
-  protected final Type type;
-  protected final FetchedInputCallback callback;
-  protected final int id;
-  protected State state;
-
-  public FetchedInput(Type type, long actualSize, long compressedSize,
-      InputAttemptIdentifier inputAttemptIdentifier,
+  private InputAttemptIdentifier inputAttemptIdentifier;
+  private final FetchedInputCallback callback;
+  private final int id;
+  private byte state;
+
+  protected FetchedInput(InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler) {
-    this.type = type;
-    this.actualSize = actualSize;
-    this.compressedSize = compressedSize;
     this.inputAttemptIdentifier = inputAttemptIdentifier;
     this.callback = callbackHandler;
     this.id = ID_GEN.getAndIncrement();
-    this.state = State.PENDING;
+    this.state = (byte) State.PENDING.ordinal();
   }
 
-  public Type getType() {
-    return this.type;
+  public abstract Type getType();
+
+  protected boolean isState(State state) {
+    return this.state == (byte) state.ordinal();
   }
 
-  public long getActualSize() {
-    return this.actualSize;
+  protected void setState(State state) {
+    this.state = (byte) state.ordinal();
   }
-  
-  public long getCompressedSize() {
-    return this.compressedSize;
+
+  protected State getState() {
+    if (isState(State.PENDING)) {
+      return State.PENDING;
+    }
+    if (isState(State.COMMITTED)) {
+      return State.COMMITTED;
+    }
+    if (isState(State.ABORTED)) {
+      return State.ABORTED;
+    }
+    if (isState(State.FREED)) {
+      return State.FREED;
+    }
+    // Should not get here
+    return null;
   }
 
+  protected int getId() {
+    return this.id;
+  }
+
+  public abstract long getSize();
+
   public InputAttemptIdentifier getInputAttemptIdentifier() {
     return this.inputAttemptIdentifier;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index bf8c83b..9f657e5 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -658,7 +658,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
           idxRecord = getTezIndexRecord(srcAttemptId, reduceId);
 
           fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
-              idxRecord.getRawLength(), idxRecord.getPartLength(), 
srcAttemptId,
+              idxRecord.getPartLength(), srcAttemptId,
               getShuffleInputFileName(srcAttemptId.getPathComponent(), null),
               conf,
               new FetchedInputCallback() {

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java
index 5c63961..0ae8f08 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java
@@ -39,18 +39,30 @@ public class LocalDiskFetchedInput extends FetchedInput {
   private final Path inputFile;
   private final FileSystem localFS;
   private final long startOffset;
+  private final long size;
 
-  public LocalDiskFetchedInput(long startOffset, long actualSize, long 
compressedSize,
+  public LocalDiskFetchedInput(long startOffset, long compressedSize,
                                InputAttemptIdentifier inputAttemptIdentifier, 
Path inputFile,
                                Configuration conf, FetchedInputCallback 
callbackHandler)
       throws IOException {
-    super(Type.DISK_DIRECT, actualSize, compressedSize, 
inputAttemptIdentifier, callbackHandler);
+    super(inputAttemptIdentifier, callbackHandler);
+    this.size = compressedSize;
     this.startOffset = startOffset;
     this.inputFile = inputFile;
     localFS = FileSystem.getLocal(conf);
   }
 
   @Override
+  public Type getType() {
+    return Type.DISK_DIRECT;
+  }
+
+  @Override
+  public long getSize() {
+    return size;
+  }
+
+  @Override
   public OutputStream getOutputStream() throws IOException {
       throw new IOException("Output Stream is not supported for " + 
this.toString());
   }
@@ -59,21 +71,21 @@ public class LocalDiskFetchedInput extends FetchedInput {
   public InputStream getInputStream() throws IOException {
     FSDataInputStream inputStream = localFS.open(inputFile);
     inputStream.seek(startOffset);
-    return new BoundedInputStream(inputStream, compressedSize);
+    return new BoundedInputStream(inputStream, getSize());
   }
 
   @Override
   public void commit() {
-    if (state == State.PENDING) {
-      state = State.COMMITTED;
+    if (isState(State.PENDING)) {
+      setState(State.COMMITTED);
       notifyFetchComplete();
     }
   }
 
   @Override
   public void abort() {
-    if (state == State.PENDING) {
-      state = State.ABORTED;
+    if (isState(State.PENDING)) {
+      setState(State.ABORTED);
       notifyFetchFailure();
     }
   }
@@ -81,10 +93,10 @@ public class LocalDiskFetchedInput extends FetchedInput {
   @Override
   public void free() {
     Preconditions.checkState(
-        state == State.COMMITTED || state == State.ABORTED,
+        isState(State.COMMITTED) || isState(State.ABORTED),
         "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) { // ABORTED would have already called 
cleanup
-      state = State.FREED;
+    if (isState(State.COMMITTED)) { // ABORTED would have already called 
cleanup
+      setState(State.FREED);
       notifyFreedResource();
     }
   }
@@ -93,12 +105,11 @@ public class LocalDiskFetchedInput extends FetchedInput {
   public String toString() {
     return "LocalDiskFetchedInput [inputFile path =" + inputFile +
         ", offset" + startOffset +
-        ", actualSize=" + actualSize +
-        ", compressedSize=" + compressedSize +
-        ", inputAttemptIdentifier=" + inputAttemptIdentifier +
-        ", type=" + type +
-        ", id=" + id +
-        ", state=" + state + "]";
+        ", compressedSize=" + getSize() +
+        ", inputAttemptIdentifier=" + getInputAttemptIdentifier() +
+        ", type=" + getType() +
+        ", id=" + getId() +
+        ", state=" + getState() + "]";
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
index 78f1f3b..d0c3e77 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
@@ -29,41 +29,54 @@ import com.google.common.base.Preconditions;
 
 public class MemoryFetchedInput extends FetchedInput {
 
-  private BoundedByteArrayOutputStream byteStream;
+  private byte[] byteArray;
 
-  public MemoryFetchedInput(long actualSize, long compressedSize,
+  public MemoryFetchedInput(long actualSize,
       InputAttemptIdentifier inputAttemptIdentifier,
       FetchedInputCallback callbackHandler) {
-    super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, 
callbackHandler);
-    this.byteStream = new BoundedByteArrayOutputStream((int) actualSize);
+    super(inputAttemptIdentifier, callbackHandler);
+    this.byteArray = new byte[(int) actualSize];
+  }
+
+  @Override
+  public Type getType() {
+    return Type.MEMORY;
+  }
+
+  @Override
+  public long getSize() {
+    if (this.byteArray == null) {
+      return 0;
+    }
+    return this.byteArray.length;
   }
 
   @Override
   public OutputStream getOutputStream() {
-    return byteStream;
+    return new InMemoryBoundedByteArrayOutputStream(byteArray);
   }
 
   @Override
   public InputStream getInputStream() {
-    return new NonSyncByteArrayInputStream(byteStream.getBuffer());
+    return new NonSyncByteArrayInputStream(byteArray);
   }
 
   public byte[] getBytes() {
-    return byteStream.getBuffer();
+    return byteArray;
   }
   
   @Override
   public void commit() {
-    if (state == State.PENDING) {
-      state = State.COMMITTED;
+    if (isState(State.PENDING)) {
+      setState(State.COMMITTED);
       notifyFetchComplete();
     }
   }
 
   @Override
   public void abort() {
-    if (state == State.PENDING) {
-      state = State.ABORTED;
+    if (isState(State.PENDING)) {
+      setState(State.ABORTED);
       notifyFetchFailure();
     }
   }
@@ -71,20 +84,28 @@ public class MemoryFetchedInput extends FetchedInput {
   @Override
   public void free() {
     Preconditions.checkState(
-        state == State.COMMITTED || state == State.ABORTED,
+        isState(State.COMMITTED) || isState(State.ABORTED),
         "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) { // ABORTED would have already called 
cleanup
-      state = State.FREED;
-      this.byteStream = null;
+    if (isState(State.COMMITTED)) { // ABORTED would have already called 
cleanup
+      setState(State.FREED);
       notifyFreedResource();
+      // Set this to null AFTER notifyFreedResource() so that getSize()
+      // returns the correct size
+      this.byteArray = null;
     }
   }
 
   @Override
   public String toString() {
     return "MemoryFetchedInput [inputAttemptIdentifier="
-        + inputAttemptIdentifier + ", actualSize=" + actualSize
-        + ", compressedSize=" + compressedSize + ", type=" + type + ", id="
-        + id + ", state=" + state + "]";
+        + getInputAttemptIdentifier() + ", size=" + getSize()
+        + ", type=" + getType() + ", id="
+        + getId() + ", state=" + getState() + "]";
+  }
+
+  private static class InMemoryBoundedByteArrayOutputStream extends 
BoundedByteArrayOutputStream {
+    InMemoryBoundedByteArrayOutputStream(byte[] array) {
+      super(array, 0, array.length);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 24fb12b..e142228 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -914,7 +914,17 @@ public class ShuffleManager implements FetcherCallback {
   static class NullFetchedInput extends FetchedInput {
 
     public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+      super(inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public Type getType() {
+      return Type.MEMORY;
+    }
+
+    @Override
+    public long getSize() {
+      return -1;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
index f939cd1..6072c03 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java
@@ -140,7 +140,7 @@ public class SimpleFetchedInputAllocator implements 
FetchedInputAllocator,
       InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     if (actualSize > maxSingleShuffleLimit
         || this.usedMemory + actualSize > this.memoryLimit) {
-      return new DiskFetchedInput(actualSize, compressedSize,
+      return new DiskFetchedInput(compressedSize,
           inputAttemptIdentifier, this, conf, localDirAllocator,
           fileNameAllocator);
     } else {
@@ -149,7 +149,7 @@ public class SimpleFetchedInputAllocator implements 
FetchedInputAllocator,
         LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + 
actualSize + " : " +
             usedMemory);
       }
-      return new MemoryFetchedInput(actualSize, compressedSize, 
inputAttemptIdentifier, this);
+      return new MemoryFetchedInput(actualSize, inputAttemptIdentifier, this);
     }
   }
 
@@ -160,7 +160,7 @@ public class SimpleFetchedInputAllocator implements 
FetchedInputAllocator,
 
     switch (type) {
     case DISK:
-      return new DiskFetchedInput(actualSize, compressedSize,
+      return new DiskFetchedInput(compressedSize,
           inputAttemptIdentifier, this, conf, localDirAllocator,
           fileNameAllocator);
     default:
@@ -197,7 +197,7 @@ public class SimpleFetchedInputAllocator implements 
FetchedInputAllocator,
     case DISK:
       break;
     case MEMORY:
-      unreserve(fetchedInput.getActualSize());
+      unreserve(((MemoryFetchedInput) fetchedInput).getSize());
       break;
     default:
       throw new TezUncheckedException("InputType: " + fetchedInput.getType()

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index c49a423..6fef944 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -96,7 +96,7 @@ public class TestUnorderedKVReader {
     createIFile(outputPath, 1);
 
     final LinkedList<LocalDiskFetchedInput> inputs = new 
LinkedList<LocalDiskFetchedInput>();
-    LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, 
rawLen, compLen, new
+    LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, 
compLen, new
         InputAttemptIdentifier(0, 0), outputPath, defaultConf, new 
FetchedInputCallback() {
       @Override
       public void fetchComplete(FetchedInput fetchedInput) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index b031154..db9c7af 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -246,8 +246,7 @@ public class TestFetcher {
         SHUFFLE_INPUT_FILE_PREFIX + pathComponent);
     Assert.assertTrue("success callback fs", f.getLocalFS() instanceof 
LocalFileSystem);
     Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 
10);
-    Assert.assertEquals("success callback raw size", f.getActualSize(), p * 
1000);
-    Assert.assertEquals("success callback compressed size", 
f.getCompressedSize(), p * 100);
+    Assert.assertEquals("success callback compressed size", f.getSize(), p * 
100);
     Assert.assertEquals("success callback input id", 
f.getInputAttemptIdentifier(), srcAttempId.expand(0));
     Assert.assertEquals("success callback type", f.getType(), 
FetchedInput.Type.DISK_DIRECT);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
index 23248ed..103f83d 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -298,7 +298,17 @@ public class TestShuffleManager {
   static class TestFetchedInput extends FetchedInput {
 
     public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+      super(inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public long getSize() {
+      return -1;
+    }
+
+    @Override
+    public Type getType() {
+      return Type.MEMORY;
     }
 
     @Override

Reply via email to