Repository: tez Updated Branches: refs/heads/master 8dcf8a121 -> 1061cf5c3
TEZ-3813. Reduce Object size of MemoryFetchedInput for large jobs (Muhammad Samir Khan via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1061cf5c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1061cf5c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1061cf5c Branch: refs/heads/master Commit: 1061cf5c310f954ffbf96a3295325b5e2eeb5d9f Parents: 8dcf8a1 Author: Jonathan Eagles <jeag...@yahoo-inc.com> Authored: Wed Aug 9 17:40:09 2017 -0500 Committer: Jonathan Eagles <jeag...@yahoo-inc.com> Committed: Wed Aug 9 17:40:09 2017 -0500 ---------------------------------------------------------------------- tez-runtime-library/findbugs-exclude.xml | 6 +++ .../common/readers/UnorderedKVReader.java | 4 +- .../common/shuffle/DiskFetchedInput.java | 44 +++++++++------ .../library/common/shuffle/FetchedInput.java | 56 +++++++++++-------- .../runtime/library/common/shuffle/Fetcher.java | 2 +- .../common/shuffle/LocalDiskFetchedInput.java | 43 +++++++++------ .../common/shuffle/MemoryFetchedInput.java | 57 +++++++++++++------- .../common/shuffle/impl/ShuffleManager.java | 12 ++++- .../impl/SimpleFetchedInputAllocator.java | 8 +-- .../common/readers/TestUnorderedKVReader.java | 2 +- .../library/common/shuffle/TestFetcher.java | 3 +- .../common/shuffle/impl/TestShuffleManager.java | 12 ++++- 12 files changed, 166 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index 24fe93e..9e55221 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -207,4 +207,10 @@ <Field name="MAX_BUFFER_SIZE"/> <Bug pattern="MS_SHOULD_BE_FINAL"/> </Match> + + <Match> + <Class name="org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput"/> + <Method name="getBytes"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index f4400db..60f70ac 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -221,10 +221,10 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput; return new InMemoryReader(null, mfi.getInputAttemptIdentifier(), - mfi.getBytes(), 0, (int) mfi.getActualSize()); + mfi.getBytes(), 0, (int) mfi.getSize()); } else { return new IFile.Reader(fetchedInput.getInputStream(), - fetchedInput.getCompressedSize(), codec, null, null, ifileReadAhead, + fetchedInput.getSize(), codec, null, null, ifileReadAhead, ifileReadAheadLength, ifileBufferSize); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java index c873af7..5d1c037 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/DiskFetchedInput.java @@ -40,21 +40,33 @@ public class DiskFetchedInput extends FetchedInput { private final FileSystem localFS; private final Path tmpOutputPath; private final Path outputPath; + private final long size; - public DiskFetchedInput(long actualSize, long compressedSize, + public DiskFetchedInput(long compressedSize, InputAttemptIdentifier inputAttemptIdentifier, FetchedInputCallback callbackHandler, Configuration conf, LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator) throws IOException { - super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler); + super(inputAttemptIdentifier, callbackHandler); + this.size = compressedSize; this.localFS = FileSystem.getLocal(conf).getRaw(); this.outputPath = filenameAllocator.getInputFileForWrite( - this.inputAttemptIdentifier.getInputIdentifier(), this - .inputAttemptIdentifier.getSpillEventId(), actualSize); + this.getInputAttemptIdentifier().getInputIdentifier(), this + .getInputAttemptIdentifier().getSpillEventId(), this.size); // Files are not clobbered due to the id being appended to the outputPath in the tmpPath, // otherwise fetches for the same task but from different attempts would clobber each other. - this.tmpOutputPath = outputPath.suffix(String.valueOf(id)); + this.tmpOutputPath = outputPath.suffix(String.valueOf(getId())); + } + + @Override + public Type getType() { + return Type.DISK; + } + + @Override + public long getSize() { + return size; } @Override @@ -68,7 +80,7 @@ public class DiskFetchedInput extends FetchedInput { } public final Path getInputPath() { - if (state == State.COMMITTED) { + if (isState(State.COMMITTED)) { return this.outputPath; } return this.tmpOutputPath; @@ -76,8 +88,8 @@ public class DiskFetchedInput extends FetchedInput { @Override public void commit() throws IOException { - if (state == State.PENDING) { - state = State.COMMITTED; + if (isState(State.PENDING)) { + setState(State.COMMITTED); localFS.rename(tmpOutputPath, outputPath); notifyFetchComplete(); } @@ -85,8 +97,8 @@ public class DiskFetchedInput extends FetchedInput { @Override public void abort() throws IOException { - if (state == State.PENDING) { - state = State.ABORTED; + if (isState(State.PENDING)) { + setState(State.ABORTED); // TODO NEWTEZ Maybe defer this to container cleanup localFS.delete(tmpOutputPath, false); notifyFetchFailure(); @@ -96,10 +108,10 @@ public class DiskFetchedInput extends FetchedInput { @Override public void free() { Preconditions.checkState( - state == State.COMMITTED || state == State.ABORTED, + isState(State.COMMITTED) || isState(State.ABORTED), "FetchedInput can only be freed after it is committed or aborted"); - if (state == State.COMMITTED) { - state = State.FREED; + if (isState(State.COMMITTED)) { + setState(State.FREED); try { // TODO NEWTEZ Maybe defer this to container cleanup localFS.delete(outputPath, false); @@ -115,8 +127,8 @@ public class DiskFetchedInput extends FetchedInput { @Override public String toString() { return "DiskFetchedInput [outputPath=" + outputPath - + ", inputAttemptIdentifier=" + inputAttemptIdentifier - + ", actualSize=" + actualSize + ",compressedSize=" + compressedSize - + ", type=" + type + ", id=" + id + ", state=" + state + "]"; + + ", inputAttemptIdentifier=" + getInputAttemptIdentifier() + + ", actualSize=" + getSize() + + ", type=" + getType() + ", id=" + getId() + ", state=" + getState() + "]"; } } http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java index 3e740a0..8982c27 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchedInput.java @@ -42,38 +42,52 @@ public abstract class FetchedInput { private static AtomicInteger ID_GEN = new AtomicInteger(0); - protected InputAttemptIdentifier inputAttemptIdentifier; - protected final long actualSize; - protected final long compressedSize; - protected final Type type; - protected final FetchedInputCallback callback; - protected final int id; - protected State state; - - public FetchedInput(Type type, long actualSize, long compressedSize, - InputAttemptIdentifier inputAttemptIdentifier, + private InputAttemptIdentifier inputAttemptIdentifier; + private final FetchedInputCallback callback; + private final int id; + private byte state; + + protected FetchedInput(InputAttemptIdentifier inputAttemptIdentifier, FetchedInputCallback callbackHandler) { - this.type = type; - this.actualSize = actualSize; - this.compressedSize = compressedSize; this.inputAttemptIdentifier = inputAttemptIdentifier; this.callback = callbackHandler; this.id = ID_GEN.getAndIncrement(); - this.state = State.PENDING; + this.state = (byte) State.PENDING.ordinal(); } - public Type getType() { - return this.type; + public abstract Type getType(); + + protected boolean isState(State state) { + return this.state == (byte) state.ordinal(); } - public long getActualSize() { - return this.actualSize; + protected void setState(State state) { + this.state = (byte) state.ordinal(); } - - public long getCompressedSize() { - return this.compressedSize; + + protected State getState() { + if (isState(State.PENDING)) { + return State.PENDING; + } + if (isState(State.COMMITTED)) { + return State.COMMITTED; + } + if (isState(State.ABORTED)) { + return State.ABORTED; + } + if (isState(State.FREED)) { + return State.FREED; + } + // Should not get here + return null; } + protected int getId() { + return this.id; + } + + public abstract long getSize(); + public InputAttemptIdentifier getInputAttemptIdentifier() { return this.inputAttemptIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index bf8c83b..9f657e5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -658,7 +658,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { idxRecord = getTezIndexRecord(srcAttemptId, reduceId); fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), - idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, + idxRecord.getPartLength(), srcAttemptId, getShuffleInputFileName(srcAttemptId.getPathComponent(), null), conf, new FetchedInputCallback() { http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java index 5c63961..0ae8f08 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/LocalDiskFetchedInput.java @@ -39,18 +39,30 @@ public class LocalDiskFetchedInput extends FetchedInput { private final Path inputFile; private final FileSystem localFS; private final long startOffset; + private final long size; - public LocalDiskFetchedInput(long startOffset, long actualSize, long compressedSize, + public LocalDiskFetchedInput(long startOffset, long compressedSize, InputAttemptIdentifier inputAttemptIdentifier, Path inputFile, Configuration conf, FetchedInputCallback callbackHandler) throws IOException { - super(Type.DISK_DIRECT, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler); + super(inputAttemptIdentifier, callbackHandler); + this.size = compressedSize; this.startOffset = startOffset; this.inputFile = inputFile; localFS = FileSystem.getLocal(conf); } @Override + public Type getType() { + return Type.DISK_DIRECT; + } + + @Override + public long getSize() { + return size; + } + + @Override public OutputStream getOutputStream() throws IOException { throw new IOException("Output Stream is not supported for " + this.toString()); } @@ -59,21 +71,21 @@ public class LocalDiskFetchedInput extends FetchedInput { public InputStream getInputStream() throws IOException { FSDataInputStream inputStream = localFS.open(inputFile); inputStream.seek(startOffset); - return new BoundedInputStream(inputStream, compressedSize); + return new BoundedInputStream(inputStream, getSize()); } @Override public void commit() { - if (state == State.PENDING) { - state = State.COMMITTED; + if (isState(State.PENDING)) { + setState(State.COMMITTED); notifyFetchComplete(); } } @Override public void abort() { - if (state == State.PENDING) { - state = State.ABORTED; + if (isState(State.PENDING)) { + setState(State.ABORTED); notifyFetchFailure(); } } @@ -81,10 +93,10 @@ public class LocalDiskFetchedInput extends FetchedInput { @Override public void free() { Preconditions.checkState( - state == State.COMMITTED || state == State.ABORTED, + isState(State.COMMITTED) || isState(State.ABORTED), "FetchedInput can only be freed after it is committed or aborted"); - if (state == State.COMMITTED) { // ABORTED would have already called cleanup - state = State.FREED; + if (isState(State.COMMITTED)) { // ABORTED would have already called cleanup + setState(State.FREED); notifyFreedResource(); } } @@ -93,12 +105,11 @@ public class LocalDiskFetchedInput extends FetchedInput { public String toString() { return "LocalDiskFetchedInput [inputFile path =" + inputFile + ", offset" + startOffset + - ", actualSize=" + actualSize + - ", compressedSize=" + compressedSize + - ", inputAttemptIdentifier=" + inputAttemptIdentifier + - ", type=" + type + - ", id=" + id + - ", state=" + state + "]"; + ", compressedSize=" + getSize() + + ", inputAttemptIdentifier=" + getInputAttemptIdentifier() + + ", type=" + getType() + + ", id=" + getId() + + ", state=" + getState() + "]"; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java index 78f1f3b..d0c3e77 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java @@ -29,41 +29,54 @@ import com.google.common.base.Preconditions; public class MemoryFetchedInput extends FetchedInput { - private BoundedByteArrayOutputStream byteStream; + private byte[] byteArray; - public MemoryFetchedInput(long actualSize, long compressedSize, + public MemoryFetchedInput(long actualSize, InputAttemptIdentifier inputAttemptIdentifier, FetchedInputCallback callbackHandler) { - super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler); - this.byteStream = new BoundedByteArrayOutputStream((int) actualSize); + super(inputAttemptIdentifier, callbackHandler); + this.byteArray = new byte[(int) actualSize]; + } + + @Override + public Type getType() { + return Type.MEMORY; + } + + @Override + public long getSize() { + if (this.byteArray == null) { + return 0; + } + return this.byteArray.length; } @Override public OutputStream getOutputStream() { - return byteStream; + return new InMemoryBoundedByteArrayOutputStream(byteArray); } @Override public InputStream getInputStream() { - return new NonSyncByteArrayInputStream(byteStream.getBuffer()); + return new NonSyncByteArrayInputStream(byteArray); } public byte[] getBytes() { - return byteStream.getBuffer(); + return byteArray; } @Override public void commit() { - if (state == State.PENDING) { - state = State.COMMITTED; + if (isState(State.PENDING)) { + setState(State.COMMITTED); notifyFetchComplete(); } } @Override public void abort() { - if (state == State.PENDING) { - state = State.ABORTED; + if (isState(State.PENDING)) { + setState(State.ABORTED); notifyFetchFailure(); } } @@ -71,20 +84,28 @@ public class MemoryFetchedInput extends FetchedInput { @Override public void free() { Preconditions.checkState( - state == State.COMMITTED || state == State.ABORTED, + isState(State.COMMITTED) || isState(State.ABORTED), "FetchedInput can only be freed after it is committed or aborted"); - if (state == State.COMMITTED) { // ABORTED would have already called cleanup - state = State.FREED; - this.byteStream = null; + if (isState(State.COMMITTED)) { // ABORTED would have already called cleanup + setState(State.FREED); notifyFreedResource(); + // Set this to null AFTER notifyFreedResource() so that getSize() + // returns the correct size + this.byteArray = null; } } @Override public String toString() { return "MemoryFetchedInput [inputAttemptIdentifier=" - + inputAttemptIdentifier + ", actualSize=" + actualSize - + ", compressedSize=" + compressedSize + ", type=" + type + ", id=" - + id + ", state=" + state + "]"; + + getInputAttemptIdentifier() + ", size=" + getSize() + + ", type=" + getType() + ", id=" + + getId() + ", state=" + getState() + "]"; + } + + private static class InMemoryBoundedByteArrayOutputStream extends BoundedByteArrayOutputStream { + InMemoryBoundedByteArrayOutputStream(byte[] array) { + super(array, 0, array.length); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 24fb12b..e142228 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -914,7 +914,17 @@ public class ShuffleManager implements FetcherCallback { static class NullFetchedInput extends FetchedInput { public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) { - super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null); + super(inputAttemptIdentifier, null); + } + + @Override + public Type getType() { + return Type.MEMORY; + } + + @Override + public long getSize() { + return -1; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java index f939cd1..6072c03 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java @@ -140,7 +140,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, InputAttemptIdentifier inputAttemptIdentifier) throws IOException { if (actualSize > maxSingleShuffleLimit || this.usedMemory + actualSize > this.memoryLimit) { - return new DiskFetchedInput(actualSize, compressedSize, + return new DiskFetchedInput(compressedSize, inputAttemptIdentifier, this, conf, localDirAllocator, fileNameAllocator); } else { @@ -149,7 +149,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " + usedMemory); } - return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this); + return new MemoryFetchedInput(actualSize, inputAttemptIdentifier, this); } } @@ -160,7 +160,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, switch (type) { case DISK: - return new DiskFetchedInput(actualSize, compressedSize, + return new DiskFetchedInput(compressedSize, inputAttemptIdentifier, this, conf, localDirAllocator, fileNameAllocator); default: @@ -197,7 +197,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, case DISK: break; case MEMORY: - unreserve(fetchedInput.getActualSize()); + unreserve(((MemoryFetchedInput) fetchedInput).getSize()); break; default: throw new TezUncheckedException("InputType: " + fetchedInput.getType() http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java index c49a423..6fef944 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java @@ -96,7 +96,7 @@ public class TestUnorderedKVReader { createIFile(outputPath, 1); final LinkedList<LocalDiskFetchedInput> inputs = new LinkedList<LocalDiskFetchedInput>(); - LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, rawLen, compLen, new + LocalDiskFetchedInput realFetchedInput = new LocalDiskFetchedInput(0, compLen, new InputAttemptIdentifier(0, 0), outputPath, defaultConf, new FetchedInputCallback() { @Override public void fetchComplete(FetchedInput fetchedInput) { http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index b031154..db9c7af 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -246,8 +246,7 @@ public class TestFetcher { SHUFFLE_INPUT_FILE_PREFIX + pathComponent); Assert.assertTrue("success callback fs", f.getLocalFS() instanceof LocalFileSystem); Assert.assertEquals("success callback filesystem", f.getStartOffset(), p * 10); - Assert.assertEquals("success callback raw size", f.getActualSize(), p * 1000); - Assert.assertEquals("success callback compressed size", f.getCompressedSize(), p * 100); + Assert.assertEquals("success callback compressed size", f.getSize(), p * 100); Assert.assertEquals("success callback input id", f.getInputAttemptIdentifier(), srcAttempId.expand(0)); Assert.assertEquals("success callback type", f.getType(), FetchedInput.Type.DISK_DIRECT); } http://git-wip-us.apache.org/repos/asf/tez/blob/1061cf5c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index 23248ed..103f83d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -298,7 +298,17 @@ public class TestShuffleManager { static class TestFetchedInput extends FetchedInput { public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) { - super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null); + super(inputAttemptIdentifier, null); + } + + @Override + public long getSize() { + return -1; + } + + @Override + public Type getType() { + return Type.MEMORY; } @Override