[hotfix][test] Deduplicate code in LargeRecordsTest and 
SpanningRecordSerializationTest

Dedupilcated code was effectively identical, but implemented in a slightly 
different way.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66ac59f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66ac59f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66ac59f2

Branch: refs/heads/master
Commit: 66ac59f285d1bc8d8b633a0dea318af92734d689
Parents: 56d3184
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Jan 16 11:37:06 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:13 2018 +0100

----------------------------------------------------------------------
 .../SpanningRecordSerializationTest.java        |  53 +++++--
 .../network/serialization/LargeRecordsTest.java | 146 -------------------
 2 files changed, 39 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66ac59f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index d32e075..af26d8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
@@ -27,6 +29,9 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
@@ -71,22 +76,43 @@ public class SpanningRecordSerializationTest {
                testSpillingDeserializer(Util.randomRecords(numValues), 
segmentSize);
        }
 
+       @Test
+       public void testHandleMixedLargeRecords() throws Exception {
+               final int numValues = 99;
+               final int segmentSize = 32 * 1024;
+
+               List<SerializationTestType> originalRecords = new 
ArrayList<>((numValues + 1) / 2);
+               LargeObjectType genLarge = new LargeObjectType();
+               Random rnd = new Random();
+
+               for (int i = 0; i < numValues; i++) {
+                       if (i % 2 == 0) {
+                               originalRecords.add(new IntType(42));
+                       } else {
+                               originalRecords.add(genLarge.getRandom(rnd));
+                       }
+               }
+
+               testNonSpillingDeserializer(originalRecords, segmentSize);
+               testSpillingDeserializer(originalRecords, segmentSize);
+       }
+
        // 
-----------------------------------------------------------------------------------------------------------------
 
-       private void testNonSpillingDeserializer(Util.MockRecords records, int 
segmentSize) throws Exception {
+       private void 
testNonSpillingDeserializer(Iterable<SerializationTestType> records, int 
segmentSize) throws Exception {
                RecordSerializer<SerializationTestType> serializer = new 
SpanningRecordSerializer<>();
                RecordDeserializer<SerializationTestType> deserializer = new 
AdaptiveSpanningRecordDeserializer<>();
 
-               test(records, segmentSize, serializer, deserializer);
+               testSerializationRoundTrip(records, segmentSize, serializer, 
deserializer);
        }
 
-       private void testSpillingDeserializer(Util.MockRecords records, int 
segmentSize) throws Exception {
+       private void testSpillingDeserializer(Iterable<SerializationTestType> 
records, int segmentSize) throws Exception {
                RecordSerializer<SerializationTestType> serializer = new 
SpanningRecordSerializer<>();
                RecordDeserializer<SerializationTestType> deserializer =
-                               new 
SpillingAdaptiveSpanningRecordDeserializer<>(
-                                               new String[] { 
System.getProperty("java.io.tmpdir") });
+                       new SpillingAdaptiveSpanningRecordDeserializer<>(
+                               new 
String[]{System.getProperty("java.io.tmpdir")});
 
-               test(records, segmentSize, serializer, deserializer);
+               testSerializationRoundTrip(records, segmentSize, serializer, 
deserializer);
        }
 
        /**
@@ -98,26 +124,24 @@ public class SpanningRecordSerializationTest {
         * @param records records to test
         * @param segmentSize size for the {@link MemorySegment}
         */
-       private void test(Util.MockRecords records, int segmentSize,
+       private static void testSerializationRoundTrip(
+                       Iterable<SerializationTestType> records,
+                       int segmentSize,
                        RecordSerializer<SerializationTestType> serializer,
-                       RecordDeserializer<SerializationTestType> deserializer) 
throws Exception {
-
-               final int serializationOverhead = 4; // length encoding
-
+                       RecordDeserializer<SerializationTestType> deserializer)
+               throws Exception {
                final ArrayDeque<SerializationTestType> serializedRecords = new 
ArrayDeque<>();
 
                // 
-------------------------------------------------------------------------------------------------------------
 
                
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
-               int numBytes = 0;
                int numRecords = 0;
                for (SerializationTestType record : records) {
 
                        serializedRecords.add(record);
 
                        numRecords++;
-                       numBytes += record.length() + serializationOverhead;
 
                        // serialize record
                        if (serializer.addRecord(record).isFullBuffer()) {
@@ -137,6 +161,7 @@ public class SpanningRecordSerializationTest {
                                        }
                                }
 
+                               // move buffers as long as necessary (for long 
records)
                                while 
(serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer())
 {
                                        
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
                                }
@@ -144,7 +169,7 @@ public class SpanningRecordSerializationTest {
                }
 
                // deserialize left over records
-               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 (numBytes % segmentSize));
+               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
 
                while (!serializedRecords.isEmpty()) {
                        SerializationTestType expected = 
serializedRecords.poll();

http://git-wip-us.apache.org/repos/asf/flink/blob/66ac59f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
deleted file mode 100644
index 79da706..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.serialization;
-
-import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
-import org.apache.flink.testutils.serialization.types.IntType;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LargeRecordsTest {
-       private final static int NUM_RECORDS = 99;
-       private final static int SEGMENT_SIZE = 32 * 1024;
-
-       @Test
-       public void testHandleMixedLargeRecords() throws IOException {
-               RecordDeserializer<SerializationTestType> deserializer = new 
AdaptiveSpanningRecordDeserializer<>();
-               testHandleMixedLargeRecords(deserializer);
-       }
-
-       @Test
-       public void testHandleMixedLargeRecordsSpillingAdaptiveSerializer() 
throws IOException {
-               RecordDeserializer<SerializationTestType> deserializer = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
-                               new 
String[]{System.getProperty("java.io.tmpdir")});
-               testHandleMixedLargeRecords(deserializer);
-       }
-
-       private void 
testHandleMixedLargeRecords(RecordDeserializer<SerializationTestType> 
deserializer) throws IOException {
-               final int NUM_RECORDS = 99;
-               final int SEGMENT_SIZE = 32 * 1024;
-
-               final RecordSerializer<SerializationTestType> serializer = new 
SpanningRecordSerializer<>();
-
-               List<SerializationTestType> originalRecords = new 
ArrayList<>((NUM_RECORDS + 1) / 2);
-               List<SerializationTestType> deserializedRecords = new 
ArrayList<>((NUM_RECORDS + 1) / 2);
-
-               LargeObjectType genLarge = new LargeObjectType();
-
-               Random rnd = new Random();
-
-               for (int i = 0; i < NUM_RECORDS; i++) {
-                       if (i % 2 == 0) {
-                               originalRecords.add(new IntType(42));
-                               deserializedRecords.add(new IntType());
-                       } else {
-                               originalRecords.add(genLarge.getRandom(rnd));
-                               deserializedRecords.add(new LargeObjectType());
-                       }
-               }
-
-               // 
-------------------------------------------------------------------------------------------------------------
-
-               
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
-
-               int numRecordsDeserialized = 0;
-
-               for (SerializationTestType record : originalRecords) {
-
-                       // serialize record
-                       if (serializer.addRecord(record).isFullBuffer()) {
-
-                               // buffer is full => move to deserializer
-                               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
-
-                               // deserialize records, as many complete as 
there are
-                               while (numRecordsDeserialized < 
deserializedRecords.size()) {
-                                       SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
-
-                                       if 
(deserializer.getNextRecord(next).isFullRecord()) {
-                                               
assertEquals(originalRecords.get(numRecordsDeserialized), next);
-                                               numRecordsDeserialized++;
-                                       } else {
-                                               break;
-                                       }
-                               }
-
-                               // move buffers as long as necessary (for long 
records)
-                               while 
(serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer())
 {
-                                       
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
-                               }
-
-                               // deserialize records, as many as there are in 
the last buffer
-                               while (numRecordsDeserialized < 
deserializedRecords.size()) {
-                                       SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
-
-                                       if 
(deserializer.getNextRecord(next).isFullRecord()) {
-                                               
assertEquals(originalRecords.get(numRecordsDeserialized), next);
-                                               numRecordsDeserialized++;
-                                       } else {
-                                               break;
-                                       }
-                               }
-                       }
-               }
-
-               // move the last (incomplete buffer)
-               Buffer last = serializer.getCurrentBuffer();
-               deserializer.setNextMemorySegment(last.getMemorySegment(), 
last.getSize());
-               serializer.clear();
-
-               // deserialize records, as many as there are in the last buffer
-               while (numRecordsDeserialized < deserializedRecords.size()) {
-                       SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
-
-                       
assertTrue(deserializer.getNextRecord(next).isFullRecord());
-                       
assertEquals(originalRecords.get(numRecordsDeserialized), next);
-                       numRecordsDeserialized++;
-               }
-
-               // might be that the last big records has not yet been fully 
moved, and a small one is missing
-               assertFalse(serializer.hasSerializedData());
-               assertFalse(deserializer.hasUnfinishedData());
-       }
-}

Reply via email to