[hotfix][runtime] Drop one of the two clear methods in RecordSerializer This simplifies an API a little bit
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f59e7b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f59e7b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f59e7b8 Branch: refs/heads/master Commit: 8f59e7b8b7113657e393409090f46ac2bfce3bc6 Parents: 058c0ed Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Jan 15 09:37:05 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:10 2018 +0100 ---------------------------------------------------------------------- .../api/serialization/RecordSerializer.java | 24 ++-------- .../serialization/SpanningRecordSerializer.java | 14 +----- .../io/network/api/writer/RecordWriter.java | 47 +++++++------------- .../SpanningRecordSerializationTest.java | 4 +- .../SpanningRecordSerializerTest.java | 19 ++++---- .../network/serialization/LargeRecordsTest.java | 4 +- 6 files changed, 33 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 1eefc79..9fb656b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -98,30 +98,12 @@ public interface RecordSerializer<T extends IOReadableWritable> { Buffer getCurrentBuffer(); /** - * Resets the target buffer to <tt>null</tt>. - * - * <p><strong>NOTE:</strong> After calling this method, <strong>a new target - * buffer is required to continue writing</strong> (see - * {@link #setNextBufferBuilder(BufferBuilder)}).</p> - */ - void clearCurrentBuffer(); - - /** - * Resets the target buffer to <tt>null</tt> and resets internal state set - * up for the record to serialize. - * - * <p><strong>NOTE:</strong> After calling this method, a <strong>new record - * and a new target buffer is required to start writing again</strong> - * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue - * with the current record, use {@link #clearCurrentBuffer()} instead.</p> + * Clear and release internal state. */ void clear(); /** - * Determines whether data is left, either in the current target buffer or - * in any internal state set up for the record to serialize. - * - * @return <tt>true</tt> if some data is present + * @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}. */ - boolean hasData(); + boolean hasSerializedData(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 330f31c..263ff43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -157,22 +157,12 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } @Override - public void clearCurrentBuffer() { - targetBuffer = null; - } - - @Override public void clear() { targetBuffer = null; - - // ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic) - dataBuffer.position(dataBuffer.limit()); - lengthBuffer.position(4); } @Override - public boolean hasData() { - // either data in current target buffer or intermediate buffers - return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining(); + public boolean hasSerializedData() { + return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 001de19..b47c461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Random; import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; +import static org.apache.flink.util.Preconditions.checkState; /** * A record-oriented runtime result writer. @@ -115,12 +116,7 @@ public class RecordWriter<T extends IOReadableWritable> { SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - + if (tryWriteAndClearBuffer(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be @@ -135,6 +131,7 @@ public class RecordWriter<T extends IOReadableWritable> { result = serializer.setNextBufferBuilder(bufferBuilder); } } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } } @@ -145,14 +142,7 @@ public class RecordWriter<T extends IOReadableWritable> { RecordSerializer<T> serializer = serializers[targetChannel]; synchronized (serializer) { - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - } else if (serializer.hasData()) { - // sanity check - throw new IllegalStateException("No buffer, but serializer has buffered data."); - } + tryWriteAndClearBuffer(targetChannel, serializer); // retain the buffer so that it can be recycled by each channel of targetPartition targetPartition.writeBuffer(eventBuffer.readOnlySlice().retainBuffer(), targetChannel); @@ -170,16 +160,7 @@ public class RecordWriter<T extends IOReadableWritable> { RecordSerializer<T> serializer = serializers[targetChannel]; synchronized (serializer) { - try { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - targetPartition.writeBuffer(buffer, targetChannel); - } - } finally { - serializer.clear(); - } + tryWriteAndClearBuffer(targetChannel, serializer); } } } @@ -213,18 +194,24 @@ public class RecordWriter<T extends IOReadableWritable> { * buffer from the serializer state. * * <p><b>Needs to be synchronized on the serializer!</b> + * + * @return true if some data were written */ - private void writeAndClearBuffer( - Buffer buffer, + private boolean tryWriteAndClearBuffer( int targetChannel, RecordSerializer<T> serializer) throws IOException { + Buffer buffer = serializer.getCurrentBuffer(); + if (buffer == null) { + return false; + } + + numBytesOut.inc(buffer.getSizeUnsafe()); try { targetPartition.writeBuffer(buffer, targetChannel); - } - finally { - serializer.clearCurrentBuffer(); + return true; + } finally { + serializer.clear(); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 7f3c3e5..d32e075 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -146,8 +146,6 @@ public class SpanningRecordSerializationTest { // deserialize left over records deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize)); - serializer.clear(); - while (!serializedRecords.isEmpty()) { SerializationTestType expected = serializedRecords.poll(); @@ -161,7 +159,7 @@ public class SpanningRecordSerializationTest { // assert that all records have been serialized and deserialized Assert.assertEquals(0, numRecords); - Assert.assertFalse(serializer.hasData()); + Assert.assertFalse(serializer.hasSerializedData()); Assert.assertFalse(deserializer.hasUnfinishedData()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index 9f4dac6..f1f9865 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -39,30 +39,28 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. public class SpanningRecordSerializerTest { @Test - public void testHasData() throws IOException { + public void testHasSerializedData() throws IOException { final int segmentSize = 16; final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); - Assert.assertFalse(serializer.hasData()); + Assert.assertFalse(serializer.hasSerializedData()); serializer.addRecord(randomIntRecord); - Assert.assertTrue(serializer.hasData()); + Assert.assertTrue(serializer.hasSerializedData()); serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); - Assert.assertTrue(serializer.hasData()); + Assert.assertFalse(serializer.hasSerializedData()); - serializer.clear(); - Assert.assertFalse(serializer.hasData()); - - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + serializer.setNextBufferBuilder(createBufferBuilder(8)); serializer.addRecord(randomIntRecord); - Assert.assertTrue(serializer.hasData()); + Assert.assertFalse(serializer.hasSerializedData()); serializer.addRecord(randomIntRecord); - Assert.assertTrue(serializer.hasData()); + // Buffer builder full! + Assert.assertTrue(serializer.hasSerializedData()); } @Test @@ -189,7 +187,6 @@ public class SpanningRecordSerializerTest { while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java index 057b917..460f699 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java @@ -128,7 +128,7 @@ public class LargeRecordsTest { } // might be that the last big records has not yet been fully moved, and a small one is missing - assertFalse(serializer.hasData()); + assertFalse(serializer.hasSerializedData()); assertFalse(deserializer.hasUnfinishedData()); } catch (Exception e) { @@ -226,7 +226,7 @@ public class LargeRecordsTest { } // might be that the last big records has not yet been fully moved, and a small one is missing - assertFalse(serializer.hasData()); + assertFalse(serializer.hasSerializedData()); assertFalse(deserializer.hasUnfinishedData()); } catch (Exception e) {