[FLINK-8587][runtime] Drop unused AdaptiveSpanningRecordDeserializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c0f4d4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c0f4d4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c0f4d4a Branch: refs/heads/master Commit: 2c0f4d4a3d6a801cdb5f2ae45fd7ed74811b634c Parents: 78df079 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Wed Jan 31 14:16:34 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:37 2018 +0100 ---------------------------------------------------------------------- .../AdaptiveSpanningRecordDeserializer.java | 608 ------------------- .../SpanningRecordSerializationTest.java | 28 +- .../RecordCollectingResultPartitionWriter.java | 5 +- ...dOrEventCollectingResultPartitionWriter.java | 5 +- 4 files changed, 14 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c0f4d4a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java deleted file mode 100644 index 598216a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ /dev/null @@ -1,608 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.serialization; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.buffer.Buffer; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -/** - * @param <T> The type of the record to be deserialized. - */ -public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> { - - private final NonSpanningWrapper nonSpanningWrapper; - - private final SpanningWrapper spanningWrapper; - - private Buffer currentBuffer; - - public AdaptiveSpanningRecordDeserializer() { - this.nonSpanningWrapper = new NonSpanningWrapper(); - this.spanningWrapper = new SpanningWrapper(); - } - - @Override - public void setNextBuffer(Buffer buffer) throws IOException { - currentBuffer = buffer; - - MemorySegment segment = buffer.getMemorySegment(); - int numBytes = buffer.getSize(); - - setNextMemorySegment(segment, numBytes); - } - - @Override - public Buffer getCurrentBuffer () { - Buffer tmp = currentBuffer; - currentBuffer = null; - return tmp; - } - - @Override - public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException { - // check if some spanning record deserialization is pending - if (this.spanningWrapper.getNumGatheredBytes() > 0) { - this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes); - } - else { - this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes); - } - } - - @Override - public DeserializationResult getNextRecord(T target) throws IOException { - // always check the non-spanning wrapper first. - // this should be the majority of the cases for small records - // for large records, this portion of the work is very small in comparison anyways - - int nonSpanningRemaining = this.nonSpanningWrapper.remaining(); - - // check if we can get a full length; - if (nonSpanningRemaining >= 4) { - int len = this.nonSpanningWrapper.readInt(); - - if (len <= nonSpanningRemaining - 4) { - // we can get a full record from here - target.read(this.nonSpanningWrapper); - - return (this.nonSpanningWrapper.remaining() == 0) ? - DeserializationResult.LAST_RECORD_FROM_BUFFER : - DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; - } else { - // we got the length, but we need the rest from the spanning deserializer - // and need to wait for more buffers - this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } - } else if (nonSpanningRemaining > 0) { - // we have an incomplete length - // add our part of the length to the length buffer - this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper); - this.nonSpanningWrapper.clear(); - return DeserializationResult.PARTIAL_RECORD; - } - - // spanning record case - if (this.spanningWrapper.hasFullRecord()) { - // get the full record - target.read(this.spanningWrapper); - - // move the remainder to the non-spanning wrapper - // this does not copy it, only sets the memory segment - this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper); - this.spanningWrapper.clear(); - - return (this.nonSpanningWrapper.remaining() == 0) ? - DeserializationResult.LAST_RECORD_FROM_BUFFER : - DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; -// } else if (this.spanningWrapper.getNumGatheredBytes() == 0) { -// // error case. we are in the spanning deserializer, but it has no bytes, yet -// throw new IllegalStateException(); - } else { - return DeserializationResult.PARTIAL_RECORD; - } - } - - @Override - public void clear() { - this.nonSpanningWrapper.clear(); - this.spanningWrapper.clear(); - } - - @Override - public boolean hasUnfinishedData() { - return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0; - } - - // ----------------------------------------------------------------------------------------------------------------- - - private static final class NonSpanningWrapper implements DataInputView { - - private MemorySegment segment; - - private int limit; - - private int position; - - private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding - private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding - - int remaining() { - return this.limit - this.position; - } - - void clear() { - this.segment = null; - this.limit = 0; - this.position = 0; - } - - void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) { - this.segment = seg; - this.position = position; - this.limit = leftOverLimit; - } - - // ------------------------------------------------------------------------------------------------------------- - // DataInput specific methods - // ------------------------------------------------------------------------------------------------------------- - - @Override - public final void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - @Override - public final void readFully(byte[] b, int off, int len) throws IOException { - if (off < 0 || len < 0 || off + len > b.length) { - throw new IndexOutOfBoundsException(); - } - - this.segment.get(this.position, b, off, len); - this.position += len; - } - - @Override - public final boolean readBoolean() throws IOException { - return readByte() == 1; - } - - @Override - public final byte readByte() throws IOException { - return this.segment.get(this.position++); - } - - @Override - public final int readUnsignedByte() throws IOException { - return readByte() & 0xff; - } - - @Override - public final short readShort() throws IOException { - final short v = this.segment.getShortBigEndian(this.position); - this.position += 2; - return v; - } - - @Override - public final int readUnsignedShort() throws IOException { - final int v = this.segment.getShortBigEndian(this.position) & 0xffff; - this.position += 2; - return v; - } - - @Override - public final char readChar() throws IOException { - final char v = this.segment.getCharBigEndian(this.position); - this.position += 2; - return v; - } - - @Override - public final int readInt() throws IOException { - final int v = this.segment.getIntBigEndian(this.position); - this.position += 4; - return v; - } - - @Override - public final long readLong() throws IOException { - final long v = this.segment.getLongBigEndian(this.position); - this.position += 8; - return v; - } - - @Override - public final float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - @Override - public final double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - @Override - public final String readLine() throws IOException { - final StringBuilder bld = new StringBuilder(32); - - try { - int b; - while ((b = readUnsignedByte()) != '\n') { - if (b != '\r') { - bld.append((char) b); - } - } - } - catch (EOFException ignored) {} - - if (bld.length() == 0) { - return null; - } - - // trim a trailing carriage return - int len = bld.length(); - if (len > 0 && bld.charAt(len - 1) == '\r') { - bld.setLength(len - 1); - } - return bld.toString(); - } - - @Override - public final String readUTF() throws IOException { - final int utflen = readUnsignedShort(); - - final byte[] bytearr; - final char[] chararr; - - if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) { - bytearr = new byte[utflen]; - this.utfByteBuffer = bytearr; - } else { - bytearr = this.utfByteBuffer; - } - if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) { - chararr = new char[utflen]; - this.utfCharBuffer = chararr; - } else { - chararr = this.utfCharBuffer; - } - - int c, char2, char3; - int count = 0; - int chararrCount = 0; - - readFully(bytearr, 0, utflen); - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) { - break; - } - count++; - chararr[chararrCount++] = (char) c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - count++; - chararr[chararrCount++] = (char) c; - break; - case 12: - case 13: - count += 2; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 1]; - if ((char2 & 0xC0) != 0x80) { - throw new UTFDataFormatException("malformed input around byte " + count); - } - chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); - break; - case 14: - count += 3; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 2]; - char3 = (int) bytearr[count - 1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { - throw new UTFDataFormatException("malformed input around byte " + (count - 1)); - } - chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F))); - break; - default: - throw new UTFDataFormatException("malformed input around byte " + count); - } - } - // The number of chars produced may be less than utflen - return new String(chararr, 0, chararrCount); - } - - @Override - public final int skipBytes(int n) throws IOException { - if (n < 0) { - throw new IllegalArgumentException(); - } - - int toSkip = Math.min(n, remaining()); - this.position += toSkip; - return toSkip; - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - int skippedBytes = skipBytes(numBytes); - - if (skippedBytes < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null){ - throw new NullPointerException("Byte array b cannot be null."); - } - - if (off < 0){ - throw new IllegalArgumentException("The offset off cannot be negative."); - } - - if (len < 0){ - throw new IllegalArgumentException("The length len cannot be negative."); - } - - int toRead = Math.min(len, remaining()); - this.segment.get(this.position, b, off, toRead); - this.position += toRead; - - return toRead; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - } - - // ----------------------------------------------------------------------------------------------------------------- - - private static final class SpanningWrapper implements DataInputView { - - private final DataOutputSerializer serializationBuffer; - - private final DataInputDeserializer serializationReadBuffer; - - private final ByteBuffer lengthBuffer; - - private int recordLength; - - private MemorySegment leftOverData; - - private int leftOverStart; - - private int leftOverLimit; - - private int recordLimit; - - public SpanningWrapper() { - this.lengthBuffer = ByteBuffer.allocate(4); - this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); - - this.recordLength = -1; - - this.serializationBuffer = new DataOutputSerializer(1024); - this.serializationReadBuffer = new DataInputDeserializer(); - } - - private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException { - // set the length and copy what is available to the buffer - this.recordLength = nextRecordLength; - this.recordLimit = partial.remaining(); - partial.segment.get(this.serializationBuffer, partial.position, partial.remaining()); - this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer()); - } - - private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException { - // copy what we have to the length buffer - partial.segment.get(partial.position, this.lengthBuffer, partial.remaining()); - } - - private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException { - int segmentPosition = 0; - - // check where to go. if we have a partial length, we need to complete it first - if (this.lengthBuffer.position() > 0) { - int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment); - segment.get(0, this.lengthBuffer, toPut); - - // did we complete the length? - if (this.lengthBuffer.hasRemaining()) { - return; - } else { - this.recordLength = this.lengthBuffer.getInt(0); - - this.lengthBuffer.clear(); - segmentPosition = toPut; - } - } - - // copy as much as we need or can for this next spanning record - int needed = this.recordLength - this.recordLimit; - int available = numBytesInSegment - segmentPosition; - int toCopy = Math.min(needed, available); - - segment.get(this.serializationBuffer, segmentPosition, toCopy); - this.recordLimit += toCopy; - - if (toCopy < available) { - // there is more data in the segment - this.leftOverData = segment; - this.leftOverStart = segmentPosition + toCopy; - this.leftOverLimit = numBytesInSegment; - } - - // update read view - this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer()); - } - - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { - deserializer.clear(); - - if (leftOverData != null) { - deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); - } - } - - private boolean hasFullRecord() { - return this.recordLength >= 0 && this.recordLimit >= this.recordLength; - } - - private int getNumGatheredBytes() { - return this.recordLimit + (this.recordLength >= 0 ? 4 : lengthBuffer.position()) + this.serializationBuffer.length(); - } - - public void clear() { - this.serializationBuffer.clear(); - this.serializationBuffer.pruneBuffer(); - this.serializationReadBuffer.releaseArrays(); - - this.recordLength = -1; - this.lengthBuffer.clear(); - this.leftOverData = null; - this.recordLimit = 0; - } - - // ------------------------------------------------------------------------------------------------------------- - // DataInput specific methods - // ------------------------------------------------------------------------------------------------------------- - - @Override - public void readFully(byte[] b) throws IOException { - this.serializationReadBuffer.readFully(b); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - this.serializationReadBuffer.readFully(b, off, len); - } - - @Override - public int skipBytes(int n) throws IOException { - return this.serializationReadBuffer.skipBytes(n); - } - - @Override - public boolean readBoolean() throws IOException { - return this.serializationReadBuffer.readBoolean(); - } - - @Override - public byte readByte() throws IOException { - return this.serializationReadBuffer.readByte(); - } - - @Override - public int readUnsignedByte() throws IOException { - return this.serializationReadBuffer.readUnsignedByte(); - } - - @Override - public short readShort() throws IOException { - return this.serializationReadBuffer.readShort(); - } - - @Override - public int readUnsignedShort() throws IOException { - return this.serializationReadBuffer.readUnsignedShort(); - } - - @Override - public char readChar() throws IOException { - return this.serializationReadBuffer.readChar(); - } - - @Override - public int readInt() throws IOException { - return this.serializationReadBuffer.readInt(); - } - - @Override - public long readLong() throws IOException { - return this.serializationReadBuffer.readLong(); - } - - @Override - public float readFloat() throws IOException { - return this.serializationReadBuffer.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return this.serializationReadBuffer.readDouble(); - } - - @Override - public String readLine() throws IOException { - return this.serializationReadBuffer.readLine(); - } - - @Override - public String readUTF() throws IOException { - return this.serializationReadBuffer.readUTF(); - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - this.serializationReadBuffer.skipBytesToRead(numBytes); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return this.serializationReadBuffer.read(b, off, len); - } - - @Override - public int read(byte[] b) throws IOException { - return this.serializationReadBuffer.read(b); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c0f4d4a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 356f210..abcdb1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -38,7 +38,7 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; /** - * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer} and {@link AdaptiveSpanningRecordDeserializer}. + * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer}. */ public class SpanningRecordSerializationTest { @@ -47,8 +47,7 @@ public class SpanningRecordSerializationTest { final int segmentSize = 1; final int numValues = 10; - testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSerializationRoundTrip(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } @Test @@ -56,8 +55,7 @@ public class SpanningRecordSerializationTest { final int segmentSize = 64; final int numValues = 64; - testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSerializationRoundTrip(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } @Test @@ -65,8 +63,7 @@ public class SpanningRecordSerializationTest { final int segmentSize = 31; final int numValues = 248; - testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSerializationRoundTrip(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } @Test @@ -74,8 +71,7 @@ public class SpanningRecordSerializationTest { final int segmentSize = 127; final int numValues = 10000; - testNonSpillingDeserializer(Util.randomRecords(numValues), segmentSize); - testSpillingDeserializer(Util.randomRecords(numValues), segmentSize); + testSerializationRoundTrip(Util.randomRecords(numValues), segmentSize); } @Test @@ -95,20 +91,12 @@ public class SpanningRecordSerializationTest { } } - testNonSpillingDeserializer(originalRecords, segmentSize); - testSpillingDeserializer(originalRecords, segmentSize); + testSerializationRoundTrip(originalRecords, segmentSize); } // ----------------------------------------------------------------------------------------------------------------- - private void testNonSpillingDeserializer(Iterable<SerializationTestType> records, int segmentSize) throws Exception { - RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); - RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<>(); - - testSerializationRoundTrip(records, segmentSize, serializer, deserializer); - } - - private void testSpillingDeserializer(Iterable<SerializationTestType> records, int segmentSize) throws Exception { + private static void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception { RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( @@ -118,7 +106,7 @@ public class SpanningRecordSerializationTest { } /** - * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer} + * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link RecordDeserializer} * interact as expected. * * <p>Only a single {@link MemorySegment} will be allocated. http://git-wip-us.apache.org/repos/asf/flink/blob/2c0f4d4a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java index 6356f4f..1285c4e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.io.network.api.writer; -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.types.Record; @@ -36,7 +36,8 @@ public class RecordCollectingResultPartitionWriter extends AbstractCollectingRes private final List<Record> output; private final Record record = new Record(); - private final RecordDeserializer<Record> deserializer = new AdaptiveSpanningRecordDeserializer<>(); + private final RecordDeserializer<Record> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{System.getProperty("java.io.tmpdir")}); public RecordCollectingResultPartitionWriter(List<Record> output, BufferProvider bufferProvider) { super(bufferProvider); http://git-wip-us.apache.org/repos/asf/flink/blob/2c0f4d4a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java index d1b4570..3d3073b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.event.AbstractEvent; -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.plugable.DeserializationDelegate; @@ -39,7 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class RecordOrEventCollectingResultPartitionWriter<T> extends AbstractCollectingResultPartitionWriter { private final Collection<Object> output; private final NonReusingDeserializationDelegate<T> delegate; - private final RecordDeserializer<DeserializationDelegate<T>> deserializer = new AdaptiveSpanningRecordDeserializer<>(); + private final RecordDeserializer<DeserializationDelegate<T>> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<> + (new String[]{System.getProperty("java.io.tmpdir")}); public RecordOrEventCollectingResultPartitionWriter( Collection<Object> output,