[hotfix][test] Deduplicate code in LargeRecordsTest and SpanningRecordSerializationTest
Dedupilcated code was effectively identical, but implemented in a slightly different way. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66ac59f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66ac59f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66ac59f2 Branch: refs/heads/master Commit: 66ac59f285d1bc8d8b633a0dea318af92734d689 Parents: 56d3184 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Tue Jan 16 11:37:06 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:13 2018 +0100 ---------------------------------------------------------------------- .../SpanningRecordSerializationTest.java | 53 +++++-- .../network/serialization/LargeRecordsTest.java | 146 ------------------- 2 files changed, 39 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/66ac59f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index d32e075..af26d8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; +import org.apache.flink.testutils.serialization.types.IntType; import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; @@ -27,6 +29,9 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; @@ -71,22 +76,43 @@ public class SpanningRecordSerializationTest { testSpillingDeserializer(Util.randomRecords(numValues), segmentSize); } + @Test + public void testHandleMixedLargeRecords() throws Exception { + final int numValues = 99; + final int segmentSize = 32 * 1024; + + List<SerializationTestType> originalRecords = new ArrayList<>((numValues + 1) / 2); + LargeObjectType genLarge = new LargeObjectType(); + Random rnd = new Random(); + + for (int i = 0; i < numValues; i++) { + if (i % 2 == 0) { + originalRecords.add(new IntType(42)); + } else { + originalRecords.add(genLarge.getRandom(rnd)); + } + } + + testNonSpillingDeserializer(originalRecords, segmentSize); + testSpillingDeserializer(originalRecords, segmentSize); + } + // ----------------------------------------------------------------------------------------------------------------- - private void testNonSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception { + private void testNonSpillingDeserializer(Iterable<SerializationTestType> records, int segmentSize) throws Exception { RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<>(); - test(records, segmentSize, serializer, deserializer); + testSerializationRoundTrip(records, segmentSize, serializer, deserializer); } - private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception { + private void testSpillingDeserializer(Iterable<SerializationTestType> records, int segmentSize) throws Exception { RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); RecordDeserializer<SerializationTestType> deserializer = - new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[] { System.getProperty("java.io.tmpdir") }); + new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{System.getProperty("java.io.tmpdir")}); - test(records, segmentSize, serializer, deserializer); + testSerializationRoundTrip(records, segmentSize, serializer, deserializer); } /** @@ -98,26 +124,24 @@ public class SpanningRecordSerializationTest { * @param records records to test * @param segmentSize size for the {@link MemorySegment} */ - private void test(Util.MockRecords records, int segmentSize, + private static void testSerializationRoundTrip( + Iterable<SerializationTestType> records, + int segmentSize, RecordSerializer<SerializationTestType> serializer, - RecordDeserializer<SerializationTestType> deserializer) throws Exception { - - final int serializationOverhead = 4; // length encoding - + RecordDeserializer<SerializationTestType> deserializer) + throws Exception { final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>(); // ------------------------------------------------------------------------------------------------------------- serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); - int numBytes = 0; int numRecords = 0; for (SerializationTestType record : records) { serializedRecords.add(record); numRecords++; - numBytes += record.length() + serializationOverhead; // serialize record if (serializer.addRecord(record).isFullBuffer()) { @@ -137,6 +161,7 @@ public class SpanningRecordSerializationTest { } } + // move buffers as long as necessary (for long records) while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) { deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); } @@ -144,7 +169,7 @@ public class SpanningRecordSerializationTest { } // deserialize left over records - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), (numBytes % segmentSize)); + deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); while (!serializedRecords.isEmpty()) { SerializationTestType expected = serializedRecords.poll(); http://git-wip-us.apache.org/repos/asf/flink/blob/66ac59f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java deleted file mode 100644 index 79da706..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.serialization; - -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; -import org.apache.flink.testutils.serialization.types.IntType; -import org.apache.flink.testutils.serialization.types.SerializationTestType; - -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class LargeRecordsTest { - private final static int NUM_RECORDS = 99; - private final static int SEGMENT_SIZE = 32 * 1024; - - @Test - public void testHandleMixedLargeRecords() throws IOException { - RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<>(); - testHandleMixedLargeRecords(deserializer); - } - - @Test - public void testHandleMixedLargeRecordsSpillingAdaptiveSerializer() throws IOException { - RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( - new String[]{System.getProperty("java.io.tmpdir")}); - testHandleMixedLargeRecords(deserializer); - } - - private void testHandleMixedLargeRecords(RecordDeserializer<SerializationTestType> deserializer) throws IOException { - final int NUM_RECORDS = 99; - final int SEGMENT_SIZE = 32 * 1024; - - final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>(); - - List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2); - List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2); - - LargeObjectType genLarge = new LargeObjectType(); - - Random rnd = new Random(); - - for (int i = 0; i < NUM_RECORDS; i++) { - if (i % 2 == 0) { - originalRecords.add(new IntType(42)); - deserializedRecords.add(new IntType()); - } else { - originalRecords.add(genLarge.getRandom(rnd)); - deserializedRecords.add(new LargeObjectType()); - } - } - - // ------------------------------------------------------------------------------------------------------------- - - serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); - - int numRecordsDeserialized = 0; - - for (SerializationTestType record : originalRecords) { - - // serialize record - if (serializer.addRecord(record).isFullBuffer()) { - - // buffer is full => move to deserializer - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE); - - // deserialize records, as many complete as there are - while (numRecordsDeserialized < deserializedRecords.size()) { - SerializationTestType next = deserializedRecords.get(numRecordsDeserialized); - - if (deserializer.getNextRecord(next).isFullRecord()) { - assertEquals(originalRecords.get(numRecordsDeserialized), next); - numRecordsDeserialized++; - } else { - break; - } - } - - // move buffers as long as necessary (for long records) - while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) { - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE); - } - - // deserialize records, as many as there are in the last buffer - while (numRecordsDeserialized < deserializedRecords.size()) { - SerializationTestType next = deserializedRecords.get(numRecordsDeserialized); - - if (deserializer.getNextRecord(next).isFullRecord()) { - assertEquals(originalRecords.get(numRecordsDeserialized), next); - numRecordsDeserialized++; - } else { - break; - } - } - } - } - - // move the last (incomplete buffer) - Buffer last = serializer.getCurrentBuffer(); - deserializer.setNextMemorySegment(last.getMemorySegment(), last.getSize()); - serializer.clear(); - - // deserialize records, as many as there are in the last buffer - while (numRecordsDeserialized < deserializedRecords.size()) { - SerializationTestType next = deserializedRecords.get(numRecordsDeserialized); - - assertTrue(deserializer.getNextRecord(next).isFullRecord()); - assertEquals(originalRecords.get(numRecordsDeserialized), next); - numRecordsDeserialized++; - } - - // might be that the last big records has not yet been fully moved, and a small one is missing - assertFalse(serializer.hasSerializedData()); - assertFalse(deserializer.hasUnfinishedData()); - } -}