[FLINK-8586][tests] Clean up hard to maintain tests

SpilledSubpartitionViewTest duplicates a lot of production logic 
(TestSubpartitionConsumer is a
duplicated logic of LocalInputChannel and mix of 
CreditBasedSequenceNumberingViewReader with PartitionRequestQueue.
Also it seems like most of the logic is covered by SpillableSubpartitionTest.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89605adb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89605adb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89605adb

Branch: refs/heads/master
Commit: 89605adb68b5cbbed6b0370355ec5ab343059910
Parents: 635c29d
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Jan 30 09:17:50 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:34 2018 +0100

----------------------------------------------------------------------
 .../partition/SpilledSubpartitionViewTest.java  | 224 -------------------
 .../network/util/TestSubpartitionConsumer.java  |   1 +
 2 files changed, 1 insertion(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89605adb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
deleted file mode 100644
index 08444f9..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for {@link SpillableSubpartitionView}, in addition to indirect tests 
via {@link
- * SpillableSubpartitionTest}.
- */
-public class SpilledSubpartitionViewTest {
-
-       private static final IOManager IO_MANAGER = new IOManagerAsync();
-
-       @AfterClass
-       public static void shutdown() {
-               IO_MANAGER.shutdown();
-       }
-
-       @Test
-       public void testWriteConsume() throws Exception {
-               // Config
-               final int numberOfBuffersToWrite = 512;
-
-               // Setup
-               final BufferFileWriter writer = 
createWriterAndWriteBuffers(numberOfBuffersToWrite);
-
-               writer.close();
-
-               TestPooledBufferProvider viewBufferPool = new 
TestPooledBufferProvider(1);
-
-               TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(
-                       false, new TestConsumerCallback.RecyclingCallback());
-
-               SpilledSubpartitionView view = new SpilledSubpartitionView(
-                       mock(SpillableSubpartition.class),
-                       viewBufferPool.getMemorySegmentSize(),
-                       writer,
-                       numberOfBuffersToWrite + 1, // +1 for end-of-partition
-                       consumer);
-
-               consumer.setSubpartitionView(view);
-
-               // Consume subpartition
-               consumer.call();
-       }
-
-       @Test
-       public void testConsumeWithFewBuffers() throws Exception {
-               // Config
-               final int numberOfBuffersToWrite = 512;
-
-               // Setup
-               final BufferFileWriter writer = 
createWriterAndWriteBuffers(numberOfBuffersToWrite);
-
-               writer.close();
-
-               TestSubpartitionConsumer consumer = new 
TestSubpartitionConsumer(
-                       false, new TestConsumerCallback.RecyclingCallback());
-
-               SpilledSubpartitionView view = new SpilledSubpartitionView(
-                       mock(SpillableSubpartition.class),
-                       32 * 1024,
-                       writer,
-                       numberOfBuffersToWrite + 1,
-                       consumer);
-
-               consumer.setSubpartitionView(view);
-
-               // No buffer available, don't deadlock. We need to make 
progress in situations when the view
-               // is consumed at an input gate with local and remote channels. 
The remote channels might
-               // eat up all the buffers, at which point the spilled view will 
not have any buffers
-               // available and the input gate can't make any progress if we 
don't return immediately.
-               //
-               // The current solution is straight-forward with a separate 
buffer per spilled subpartition,
-               // but introduces memory-overhead.
-               //
-               // TODO Replace with asynchronous buffer pool request as this 
introduces extra buffers per
-               // consumed subpartition.
-               consumer.call();
-       }
-
-       @Test
-       public void testReadMultipleFilesWithSingleBufferPool() throws 
Exception {
-               ExecutorService executor = null;
-               BufferFileWriter[] writers = null;
-               ResultSubpartitionView[] readers = null;
-
-               try {
-                       executor = Executors.newCachedThreadPool();
-
-                       // Setup
-                       writers = new BufferFileWriter[]{
-                               createWriterAndWriteBuffers(512),
-                               createWriterAndWriteBuffers(512)
-                       };
-
-                       readers = new ResultSubpartitionView[writers.length];
-                       TestSubpartitionConsumer[] consumers = new 
TestSubpartitionConsumer[writers.length];
-
-                       BufferProvider inputBuffers = new 
TestPooledBufferProvider(2);
-
-                       SpillableSubpartition parent = 
mock(SpillableSubpartition.class);
-
-                       // Wait for writers to finish
-                       for (BufferFileWriter writer : writers) {
-                               writer.close();
-                       }
-
-                       // Create the views depending on the test configuration
-                       for (int i = 0; i < readers.length; i++) {
-                               consumers[i] = new TestSubpartitionConsumer(
-                                       false, new 
TestConsumerCallback.RecyclingCallback());
-
-                               readers[i] = new SpilledSubpartitionView(
-                                       parent,
-                                       inputBuffers.getMemorySegmentSize(),
-                                       writers[i],
-                                       512 + 1, // +1 for end of partition 
event
-                                       consumers[i]);
-
-                               consumers[i].setSubpartitionView(readers[i]);
-                       }
-
-                       final List<Future<Boolean>> results = 
Lists.newArrayList();
-
-                       // Submit the consuming tasks
-                       for (TestSubpartitionConsumer consumer : consumers) {
-                               results.add(executor.submit(consumer));
-                       }
-
-                       // Wait for the results
-                       for (Future<Boolean> res : results) {
-                               try {
-                                       res.get(2, TimeUnit.MINUTES);
-                               } catch (TimeoutException e) {
-                                       throw new TimeoutException("There has 
been a timeout in the test. This " +
-                                               "indicates that there is a 
bug/deadlock in the tested subpartition " +
-                                               "view.");
-                               }
-                       }
-               } finally {
-                       if (writers != null) {
-                               for (BufferFileWriter writer : writers) {
-                                       if (writer != null) {
-                                               writer.deleteChannel();
-                                       }
-                               }
-                       }
-
-                       if (readers != null) {
-                               for (ResultSubpartitionView reader : readers) {
-                                       if (reader != null) {
-                                               reader.releaseAllResources();
-                                       }
-                               }
-                       }
-
-                       if (executor != null) {
-                               executor.shutdown();
-                       }
-               }
-       }
-
-       /**
-        * Returns a buffer file writer, to which the specified number of 
buffer write requests have
-        * been issued (including an end of partition event).
-        *
-        * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers 
have been written.
-        */
-       private static BufferFileWriter createWriterAndWriteBuffers(int 
numberOfBuffers) throws IOException {
-
-               final BufferFileWriter writer = 
IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());
-
-               for (int i = 0; i < numberOfBuffers; i++) {
-                       
writer.writeBlock(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE));
-               }
-
-               
writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
-
-               return writer;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/89605adb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index b4bdd3e..2c6ee50 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -40,6 +40,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @see TestConsumerCallback
  */
+@Deprecated
 public class TestSubpartitionConsumer implements Callable<Boolean>, 
BufferAvailabilityListener {
 
        private static final int MAX_SLEEP_TIME_MS = 20;

Reply via email to