[hotfix][runtime] Drop one of the two clear methods in RecordSerializer

This simplifies an API a little bit


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f59e7b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f59e7b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f59e7b8

Branch: refs/heads/master
Commit: 8f59e7b8b7113657e393409090f46ac2bfce3bc6
Parents: 058c0ed
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Mon Jan 15 09:37:05 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:10 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     | 24 ++--------
 .../serialization/SpanningRecordSerializer.java | 14 +-----
 .../io/network/api/writer/RecordWriter.java     | 47 +++++++-------------
 .../SpanningRecordSerializationTest.java        |  4 +-
 .../SpanningRecordSerializerTest.java           | 19 ++++----
 .../network/serialization/LargeRecordsTest.java |  4 +-
 6 files changed, 33 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 1eefc79..9fb656b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -98,30 +98,12 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
        Buffer getCurrentBuffer();
 
        /**
-        * Resets the target buffer to <tt>null</tt>.
-        *
-        * <p><strong>NOTE:</strong> After calling this method, <strong>a new 
target
-        * buffer is required to continue writing</strong> (see
-        * {@link #setNextBufferBuilder(BufferBuilder)}).</p>
-        */
-       void clearCurrentBuffer();
-
-       /**
-        * Resets the target buffer to <tt>null</tt> and resets internal state 
set
-        * up for the record to serialize.
-        *
-        * <p><strong>NOTE:</strong> After calling this method, a <strong>new 
record
-        * and a new target buffer is required to start writing again</strong>
-        * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to 
continue
-        * with the current record, use {@link #clearCurrentBuffer()} 
instead.</p>
+        * Clear and release internal state.
         */
        void clear();
 
        /**
-        * Determines whether data is left, either in the current target buffer 
or
-        * in any internal state set up for the record to serialize.
-        *
-        * @return <tt>true</tt> if some data is present
+        * @return <tt>true</tt> if has some serialized data pending copying to 
the result {@link BufferBuilder}.
         */
-       boolean hasData();
+       boolean hasSerializedData();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 330f31c..263ff43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -157,22 +157,12 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        }
 
        @Override
-       public void clearCurrentBuffer() {
-               targetBuffer = null;
-       }
-
-       @Override
        public void clear() {
                targetBuffer = null;
-
-               // ensure clear state with hasRemaining false (for correct 
setNextBufferBuilder logic)
-               dataBuffer.position(dataBuffer.limit());
-               lengthBuffer.position(4);
        }
 
        @Override
-       public boolean hasData() {
-               // either data in current target buffer or intermediate buffers
-               return (targetBuffer != null && !targetBuffer.isEmpty()) || 
lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
+       public boolean hasSerializedData() {
+               return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 001de19..b47c461 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A record-oriented runtime result writer.
@@ -115,12 +116,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                        SerializationResult result = 
serializer.addRecord(record);
 
                        while (result.isFullBuffer()) {
-                               Buffer buffer = serializer.getCurrentBuffer();
-
-                               if (buffer != null) {
-                                       numBytesOut.inc(buffer.getSizeUnsafe());
-                                       writeAndClearBuffer(buffer, 
targetChannel, serializer);
-
+                               if (tryWriteAndClearBuffer(targetChannel, 
serializer)) {
                                        // If this was a full record, we are 
done. Not breaking
                                        // out of the loop at this point will 
lead to another
                                        // buffer request before breaking out 
(that would not be
@@ -135,6 +131,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                                        result = 
serializer.setNextBufferBuilder(bufferBuilder);
                                }
                        }
+                       checkState(!serializer.hasSerializedData(), "All data 
should be written at once");
                }
        }
 
@@ -145,14 +142,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                                RecordSerializer<T> serializer = 
serializers[targetChannel];
 
                                synchronized (serializer) {
-                                       Buffer buffer = 
serializer.getCurrentBuffer();
-                                       if (buffer != null) {
-                                               
numBytesOut.inc(buffer.getSizeUnsafe());
-                                               writeAndClearBuffer(buffer, 
targetChannel, serializer);
-                                       } else if (serializer.hasData()) {
-                                               // sanity check
-                                               throw new 
IllegalStateException("No buffer, but serializer has buffered data.");
-                                       }
+                                       tryWriteAndClearBuffer(targetChannel, 
serializer);
 
                                        // retain the buffer so that it can be 
recycled by each channel of targetPartition
                                        
targetPartition.writeBuffer(eventBuffer.readOnlySlice().retainBuffer(), 
targetChannel);
@@ -170,16 +160,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                        RecordSerializer<T> serializer = 
serializers[targetChannel];
 
                        synchronized (serializer) {
-                               try {
-                                       Buffer buffer = 
serializer.getCurrentBuffer();
-
-                                       if (buffer != null) {
-                                               
numBytesOut.inc(buffer.getSizeUnsafe());
-                                               
targetPartition.writeBuffer(buffer, targetChannel);
-                                       }
-                               } finally {
-                                       serializer.clear();
-                               }
+                               tryWriteAndClearBuffer(targetChannel, 
serializer);
                        }
                }
        }
@@ -213,18 +194,24 @@ public class RecordWriter<T extends IOReadableWritable> {
         * buffer from the serializer state.
         *
         * <p><b>Needs to be synchronized on the serializer!</b>
+        *
+        * @return true if some data were written
         */
-       private void writeAndClearBuffer(
-                       Buffer buffer,
+       private boolean tryWriteAndClearBuffer(
                        int targetChannel,
                        RecordSerializer<T> serializer) throws IOException {
 
+               Buffer buffer = serializer.getCurrentBuffer();
+               if (buffer == null) {
+                       return false;
+               }
+
+               numBytesOut.inc(buffer.getSizeUnsafe());
                try {
                        targetPartition.writeBuffer(buffer, targetChannel);
-               }
-               finally {
-                       serializer.clearCurrentBuffer();
+                       return true;
+               } finally {
+                       serializer.clear();
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 7f3c3e5..d32e075 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -146,8 +146,6 @@ public class SpanningRecordSerializationTest {
                // deserialize left over records
                
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 (numBytes % segmentSize));
 
-               serializer.clear();
-
                while (!serializedRecords.isEmpty()) {
                        SerializationTestType expected = 
serializedRecords.poll();
 
@@ -161,7 +159,7 @@ public class SpanningRecordSerializationTest {
 
                // assert that all records have been serialized and deserialized
                Assert.assertEquals(0, numRecords);
-               Assert.assertFalse(serializer.hasData());
+               Assert.assertFalse(serializer.hasSerializedData());
                Assert.assertFalse(deserializer.hasUnfinishedData());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index 9f4dac6..f1f9865 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -39,30 +39,28 @@ import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 public class SpanningRecordSerializerTest {
 
        @Test
-       public void testHasData() throws IOException {
+       public void testHasSerializedData() throws IOException {
                final int segmentSize = 16;
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<>();
                final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
-               Assert.assertFalse(serializer.hasData());
+               Assert.assertFalse(serializer.hasSerializedData());
 
                serializer.addRecord(randomIntRecord);
-               Assert.assertTrue(serializer.hasData());
+               Assert.assertTrue(serializer.hasSerializedData());
 
                
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
-               Assert.assertTrue(serializer.hasData());
+               Assert.assertFalse(serializer.hasSerializedData());
 
-               serializer.clear();
-               Assert.assertFalse(serializer.hasData());
-
-               
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
+               serializer.setNextBufferBuilder(createBufferBuilder(8));
 
                serializer.addRecord(randomIntRecord);
-               Assert.assertTrue(serializer.hasData());
+               Assert.assertFalse(serializer.hasSerializedData());
 
                serializer.addRecord(randomIntRecord);
-               Assert.assertTrue(serializer.hasData());
+               // Buffer builder full!
+               Assert.assertTrue(serializer.hasSerializedData());
        }
 
        @Test
@@ -189,7 +187,6 @@ public class SpanningRecordSerializerTest {
 
                                while (result.isFullBuffer()) {
                                        numBytes -= segmentSize;
-
                                        result = 
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f59e7b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index 057b917..460f699 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -128,7 +128,7 @@ public class LargeRecordsTest {
                        }
                        
                        // might be that the last big records has not yet been 
fully moved, and a small one is missing
-                       assertFalse(serializer.hasData());
+                       assertFalse(serializer.hasSerializedData());
                        assertFalse(deserializer.hasUnfinishedData());
                }
                catch (Exception e) {
@@ -226,7 +226,7 @@ public class LargeRecordsTest {
                        }
                        
                        // might be that the last big records has not yet been 
fully moved, and a small one is missing
-                       assertFalse(serializer.hasData());
+                       assertFalse(serializer.hasSerializedData());
                        assertFalse(deserializer.hasUnfinishedData());
                }
                catch (Exception e) {

Reply via email to