[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327063876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link BroadcastRecordWriter}.
+ */
+public class BroadcastRecordWriterTest extends RecordWriterTest {
+
+   public BroadcastRecordWriterTest() {
+   super(true);
+   }
+
+   /**
+* Tests the number of requested buffers and results are correct in the 
case of switching
+* modes between {@link 
BroadcastRecordWriter#broadcastEmit(IOReadableWritable)} and
+* {@link BroadcastRecordWriter#randomEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastMixedRandomEmitRecord() throws Exception {
 
 Review comment:
   I couldn't understand the test after trying 3 times. Could you please add 
some comments in between?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062898
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -225,46 +178,56 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
numBuffersOut = metrics.getNumBuffersOutCounter();
}
 
-   /**
-* Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*/
-   private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return;
-   }
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   protected void finishBufferBuilder(BufferBuilder bufferBuilder) {
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
}
 
+   /**
+* This is used to send regular records.
+*/
+   public abstract void emit(T record) throws IOException, 
InterruptedException;
+
+   /**
+* This is used to send LatencyMarks to a random target channel.
+*/
+   public abstract void randomEmit(T record) throws IOException, 
InterruptedException;
+
+   /**
+* This is used to broadcast streaming Watermarks in-band with records.
+*/
+   public abstract void broadcastEmit(T record) throws IOException, 
InterruptedException;
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
-   } else {
-   return requestNewBufferBuilder(targetChannel);
-   }
-   }
+   abstract BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException;
 
-   private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].get().isFinished());
+   /**
+* Requests a new {@link BufferBuilder} for the target channel and 
returns it.
+*/
+   abstract BufferBuilder requestNewBufferBuilder(int targetChannel) 
throws IOException, InterruptedException;
 
-   BufferBuilder bufferBuilder = 
targetPartition.getBufferBuilder();
-   bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
-   
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
-   return bufferBuilder;
-   }
+   /**
+* Marks the current {@link BufferBuilder} as finished if present and 
clears the state for next one.
+*/
+   abstract void tryFinishCurrentBufferBuilder(int targetChannel);
 
-   private void closeBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   bufferBuilders[targetChannel].get().finish();
-   bufferBuilders[targetChannel] = Optional.empty();
-   }
-   }
+   /**
+* Marks the current {@link BufferBuilder} as empty for the target 
channel.
+*/
+   abstract void emptyCurrentBufferBuilder(int targetChannel);
+
+   /**
+* Marks the current {@link BufferBuilder} as finished and empty for 
the target channel.
 
 Review comment:
   suggestion: Marks the current {@link BufferBuilder} as finished and releases 
the resources for the target channel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062516
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A regular record-oriented runtime result writer.
+ *
+ * The ChannelSelectorRecordWriter extends the {@link RecordWriter} and 
maintains an array of
+ * {@link BufferBuilder}s for all the channels. The {@link 
#emit(IOReadableWritable)}
+ * operation is based on {@link ChannelSelector} to select the target channel.
+ *
+ * @param  the type of the record that can be emitted with this record 
writer
+ */
+public class ChannelSelectorRecordWriter extends 
RecordWriter {
 
 Review comment:
   final for JIT optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062444
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -18,30 +18,163 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
 import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * The BroadcastRecordWriter extends the {@link RecordWriter} and handles 
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more 
efficient way.
+ * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain 
a single {@link BufferBuilder}
+ * for all the channels. Then the serialization results need be copied only 
once to this buffer which would be
+ * shared for all the channels in a more efficient way.
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
 public class BroadcastRecordWriter extends 
RecordWriter {
 
 Review comment:
   final for JIT optimization?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327062031
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -54,23 +54,17 @@
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
-public class RecordWriter {
+public abstract class RecordWriter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(RecordWriter.class);
 
-   private final ResultPartitionWriter targetPartition;
+   protected final ResultPartitionWriter targetPartition;
 
-   private final ChannelSelector channelSelector;
+   protected final int numberOfChannels;
 
 Review comment:
   any reason to switch the order of fields here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327061719
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A regular record-oriented runtime result writer.
+ *
+ * The ChannelSelectorRecordWriter extends the {@link RecordWriter} and 
maintains an array of
+ * {@link BufferBuilder}s for all the channels. The {@link 
#emit(IOReadableWritable)}
+ * operation is based on {@link ChannelSelector} to select the target channel.
+ *
+ * @param  the type of the record that can be emitted with this record 
writer
+ */
+public class ChannelSelectorRecordWriter extends 
RecordWriter {
+
+   private final ChannelSelector channelSelector;
+
+   private final Optional[] bufferBuilders;
 
 Review comment:
   See above, no optional fields. Also mixing generics and arrays is not 
recommended.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode

2019-09-23 Thread GitBox
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy 
intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r327059906
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -18,30 +18,163 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
 import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * The BroadcastRecordWriter extends the {@link RecordWriter} and handles 
{@link #emit(IOReadableWritable)}
- * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more 
efficient way.
+ * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain 
a single {@link BufferBuilder}
+ * for all the channels. Then the serialization results need be copied only 
once to this buffer which would be
+ * shared for all the channels in a more efficient way.
  *
  * @param  the type of the record that can be emitted with this record 
writer
  */
 public class BroadcastRecordWriter extends 
RecordWriter {
 
+   /** The current buffer builder shared for all the channels. */
+   private Optional bufferBuilder = Optional.empty();
 
 Review comment:
   Optional fields are 
[discouraged](http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCAA_61Xo9oif8RjJgvxFNo%2Bua7_DoFksCy_5c_NOnQLWfS4-8qA%40mail.gmail.com%3E).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services