[hotfix][runtime] Rename setNextBufferBuilder to continueWritingWithNextBufferBuilder
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa024726 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa024726 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa024726 Branch: refs/heads/master Commit: fa024726bb801fc71cec5cc303cac1d4a03f555e Parents: bc55d7a Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Feb 15 14:23:36 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Feb 19 15:05:34 2018 +0100 ---------------------------------------------------------------------- .../network/api/serialization/RecordSerializer.java | 2 +- .../api/serialization/SpanningRecordSerializer.java | 6 +++--- .../runtime/io/network/api/writer/RecordWriter.java | 2 +- .../SpanningRecordSerializationTest.java | 2 +- .../serialization/SpanningRecordSerializerTest.java | 14 +++++++------- .../consumer/IteratorWrappingTestSingleInputGate.java | 2 +- .../partition/consumer/StreamTestSingleInputGate.java | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index a74a068..25d2927 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -83,7 +83,7 @@ public interface RecordSerializer<T extends IOReadableWritable> { * @return how much information was written to the target buffer and * whether this buffer is full */ - SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; + SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; /** * Clear and release internal state. http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index d7befeb..c4ab53f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -32,7 +32,7 @@ import java.nio.ByteOrder; /** * Record serializer which serializes the complete record to an intermediate * data serialization buffer and copies this buffer to target buffers - * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}. + * one-by-one using {@link #continueWritingWithNextBufferBuilder(BufferBuilder)}. * * @param <T> The type of the records that are serialized. */ @@ -60,7 +60,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R lengthBuffer = ByteBuffer.allocate(4); lengthBuffer.order(ByteOrder.BIG_ENDIAN); - // ensure initial state with hasRemaining false (for correct setNextBufferBuilder logic) + // ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic) dataBuffer = serializationBuffer.wrapAsByteBuffer(); lengthBuffer.position(4); } @@ -103,7 +103,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } @Override - public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException { + public SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder buffer) throws IOException { targetBuffer = buffer; boolean mustCommit = false; http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 51dfbde..fabac9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -132,7 +132,7 @@ public class RecordWriter<T extends IOReadableWritable> { } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); - result = serializer.setNextBufferBuilder(bufferBuilder); + result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); } http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index aa09681..16b77e6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -196,7 +196,7 @@ public class SpanningRecordSerializationTest { serializer.clear(); return new BufferConsumerAndSerializerResult( bufferConsumer, - serializer.setNextBufferBuilder(bufferBuilder)); + serializer.continueWritingWithNextBufferBuilder(bufferBuilder)); } private static class BufferConsumerAndSerializerResult { http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index f1f9865..c39b54a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -50,10 +50,10 @@ public class SpanningRecordSerializerTest { serializer.addRecord(randomIntRecord); Assert.assertTrue(serializer.hasSerializedData()); - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); Assert.assertFalse(serializer.hasSerializedData()); - serializer.setNextBufferBuilder(createBufferBuilder(8)); + serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8)); serializer.addRecord(randomIntRecord); Assert.assertFalse(serializer.hasSerializedData()); @@ -72,7 +72,7 @@ public class SpanningRecordSerializerTest { try { Assert.assertEquals( RecordSerializer.SerializationResult.FULL_RECORD, - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize))); + serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize))); } catch (IOException e) { e.printStackTrace(); Assert.fail(e.getMessage()); @@ -115,7 +115,7 @@ public class SpanningRecordSerializerTest { result = serializer.addRecord(emptyRecord); Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); - result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } @@ -169,7 +169,7 @@ public class SpanningRecordSerializerTest { // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); int numBytes = 0; for (SerializationTestType record : records) { @@ -180,14 +180,14 @@ public class SpanningRecordSerializerTest { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); numBytes = 0; } else { Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index 9de2bbe..105e35f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -75,7 +75,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e if (hasData) { serializer.clear(); BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); - serializer.setNextBufferBuilder(bufferBuilder); + serializer.continueWritingWithNextBufferBuilder(bufferBuilder); serializer.addRecord(reuse); hasData = inputIterator.next(reuse) != null; http://git-wip-us.apache.org/repos/asf/flink/blob/fa024726/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 11254ef..6ab8074 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -112,7 +112,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { Object inputElement = input.getStreamRecord(); BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); - recordSerializer.setNextBufferBuilder(bufferBuilder); + recordSerializer.continueWritingWithNextBufferBuilder(bufferBuilder); delegate.setInstance(inputElement); recordSerializer.addRecord(delegate); bufferBuilder.finish();