[hotfix][benchmarks] Add network stack benchmarks for LocalInputChannels

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08f7284a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08f7284a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08f7284a

Branch: refs/heads/master
Commit: 08f7284ab047ada1be0c7e0447000f179d8f33a2
Parents: 5b1e127
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Feb 1 12:05:59 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:42 2018 +0100

----------------------------------------------------------------------
 .../StreamNetworkBenchmarkEnvironment.java      | 19 ++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java     |  2 +-
 .../StreamNetworkThroughputBenchmark.java       | 21 ++++++++++++--------
 .../StreamNetworkThroughputBenchmarkTests.java  | 16 +++++++++++++++
 4 files changed, 43 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 553503b..b1613f2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -89,20 +89,27 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
        protected IOManager ioManager;
 
        protected int channels;
+       protected boolean localMode = false;
 
        protected ResultPartitionID[] partitionIds;
 
-       public void setUp(int writers, int channels) throws Exception {
+       public void setUp(int writers, int channels, boolean localMode) throws 
Exception {
+               this.localMode = localMode;
                this.channels = channels;
                this.partitionIds = new ResultPartitionID[writers];
 
-               int bufferPoolSize = Math.max(2048, writers * channels * 4);
-               senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
-               receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
                ioManager = new IOManagerAsync();
 
+               int bufferPoolSize = Math.max(2048, writers * channels * 4);
+               senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
                senderEnv.start();
-               receiverEnv.start();
+               if (localMode) {
+                       receiverEnv = senderEnv;
+               }
+               else {
+                       receiverEnv = 
createNettyNetworkEnvironment(bufferPoolSize);
+                       receiverEnv.start();
+               }
 
                generatePartitionIds();
        }
@@ -206,7 +213,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        InputChannelDeploymentDescriptor[] channelDescriptors = 
Arrays.stream(partitionIds)
                                .map(partitionId -> new 
InputChannelDeploymentDescriptor(
                                        partitionId,
-                                       
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, 
finalChannel))))
+                                       localMode ? 
ResultPartitionLocation.createLocal() : 
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, 
finalChannel))))
                                
.toArray(InputChannelDeploymentDescriptor[]::new);
 
                        final InputGateDeploymentDescriptor gateDescriptor = 
new InputGateDeploymentDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 843d3e2..cc302e8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -70,7 +70,7 @@ public class StreamNetworkPointToPointBenchmark {
         */
        public void setUp(long flushTimeout) throws Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(1, 1);
+               environment.setUp(1, 1, false);
 
                receiver = environment.createReceiver();
                recordWriter = environment.createRecordWriter(0, flushTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 3f41b00..fe08993 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -28,19 +28,20 @@ import java.util.concurrent.TimeUnit;
  * <a 
href="https://github.com/dataArtisans/flink-benchmarks";>flink-benchmarks</a> 
project.
  */
 public class StreamNetworkThroughputBenchmark {
-       private static final long RECEIVER_TIMEOUT = 30_000;
-
        private StreamNetworkBenchmarkEnvironment<LongValue> environment;
        private ReceiverThread receiver;
        private LongRecordWriterThread[] writerThreads;
 
+       public void executeBenchmark(long records) throws Exception {
+               executeBenchmark(records, Long.MAX_VALUE);
+       }
+
        /**
         * Executes the throughput benchmark with the given number of records.
         *
-        * @param records
-        *              records to pass through the network stack
+        * @param records to pass through the network stack
         */
-       public void executeBenchmark(long records) throws Exception {
+       public void executeBenchmark(long records, long timeout) throws 
Exception {
                final LongValue value = new LongValue();
                value.setValue(0);
 
@@ -51,7 +52,11 @@ public class StreamNetworkThroughputBenchmark {
                        writerThread.setRecordsToSend(lastRecord);
                }
 
-               recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
+               recordsReceived.get(timeout, TimeUnit.MILLISECONDS);
+       }
+
+       public void setUp(int recordWriters, int channels, int flushTimeout) 
throws Exception {
+               setUp(recordWriters, channels, flushTimeout, false);
        }
 
        /**
@@ -63,9 +68,9 @@ public class StreamNetworkThroughputBenchmark {
         * @param channels
         *              number of outgoing channels / receivers
         */
-       public void setUp(int recordWriters, int channels, int flushTimeout) 
throws Exception {
+       public void setUp(int recordWriters, int channels, int flushTimeout, 
boolean localMode) throws Exception {
                environment = new StreamNetworkBenchmarkEnvironment<>();
-               environment.setUp(recordWriters, channels);
+               environment.setUp(recordWriters, channels, localMode);
                receiver = environment.createReceiver();
                writerThreads = new LongRecordWriterThread[recordWriters];
                for (int writer = 0; writer < recordWriters; writer++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/08f7284a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
index 8af8148..a8251a8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
@@ -37,6 +37,22 @@ public class StreamNetworkThroughputBenchmarkTests {
        }
 
        @Test
+       public void largeLocalMode() throws Exception {
+               StreamNetworkThroughputBenchmark env = new 
StreamNetworkThroughputBenchmark();
+               env.setUp(4, 10, 100, true);
+               env.executeBenchmark(10_000_000);
+               env.tearDown();
+       }
+
+       @Test
+       public void largeRemoteMode() throws Exception {
+               StreamNetworkThroughputBenchmark env = new 
StreamNetworkThroughputBenchmark();
+               env.setUp(4, 10, 100, false);
+               env.executeBenchmark(10_000_000);
+               env.tearDown();
+       }
+
+       @Test
        public void pointToMultiPointBenchmark() throws Exception {
                StreamNetworkThroughputBenchmark benchmark = new 
StreamNetworkThroughputBenchmark();
                benchmark.setUp(1, 100, 100);

Reply via email to