[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623432#comment-16623432
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d292771d0..6eebbbe88eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
/**
-* Sets a (next) target buffer to use and continues writing remaining 
data
-* to it until it is full.
+* Copies the intermediate data serialization buffer to the given 
target buffer.
 *
 * @param bufferBuilder the new target buffer to use
 * @return how much information was written to the target buffer and
 * whether this buffer is full
 */
-   SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder 
bufferBuilder) throws IOException;
+   SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+   /**
+* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
+* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
+* {@link #copyToBufferBuilder(BufferBuilder)}.
+*/
+   void prune();
 
/**
-* Clear and release internal state.
+* Supports copying an intermediate data serialization buffer to 
multiple target buffers
+* by resetting its initial position before each copying.
 */
-   void clear();
+   void reset();
 
/**
 * @return true if has some serialized data pending copying to 
the result {@link BufferBuilder}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f4b3a..ba2ed0133fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link 
#continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param  The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@
/** Intermediate buffer for length serialization. */
private final ByteBuffer lengthBuffer;
 
-   /** Current target {@link Buffer} of the serializer. */
-   @Nullable
-   private BufferBuilder targetBuffer;
-
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public SpanningRecordSerializer() {
}
 
/**
-* Serializes the complete record to an intermediate data serialization
-* buffer and starts copying it to the target buffer (if available).
+* Serializes 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623429#comment-16623429
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421
 
 
   @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for 
the contribution and the time spent on the feature/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614627#comment-16614627
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-421305505
 
 
   @pnowojski , I have squashed the commits for your just concerns! :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614621#comment-16614621
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217663737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   The calling from `getBufferBuilder` does not need this `checkState`. But for 
calling from `copyFromSerializerToTargetChannel`, it may be necessary to add 
this check for avoiding bugs.
   
   I removed it just for reducing some overheads. I will restore this check. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614604#comment-16614604
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217661048
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   serializer.serializeRecord(record);
-
-   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
-   serializer.prune();
-   }
+   emit(record, new int[] { rng.nextInt(numChannels) });
 
 Review comment:
   got it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614582#comment-16614582
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217655518
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   This `checkState` was preventing from some bugs and data loses. Maybe 
replace with:
   ```
   checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].isFinished());
   ```
   ?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614577#comment-16614577
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217654112
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   serializer.serializeRecord(record);
-
-   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
-   serializer.prune();
-   }
+   emit(record, new int[] { rng.nextInt(numChannels) });
 
 Review comment:
   Wait a minute here. 
   
   ```
   emit(record, new int[] { rng.nextInt(numChannels) });
   ```
   
   @NicoK is an actual performance regression. Creating a single element int 
array (once per every record!) actually reduces the throughput by about 10%-20% 
(tested during one of the hackathons). Please revert to using 
`copyFromSerializerToTargetChannel(rng.nextInt(numChannels))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614578#comment-16614578
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217655518
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   This `checkState` was preventing from some bugs and data loses. Maybe 
replace with:
   `checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].isFinished());`?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614469#comment-16614469
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-421255951
 
 
   @NicoK, I updates the codes covering your new comments.
   BTW, the travis fails irrelevant with my codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614464#comment-16614464
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217619644
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   Thanks for this further suggestion!
   
   I agree with the idea of making the logic simple in the loop part and reduce 
the overhead related with the `BufferBuilder` array.
   
   I adjust the process a bit different with above codes. I think 
`bufferBuilders[targetChannel] = Optional.ofNullable(bufferBuilder)` do not 
need to be called every time during copy, because it only makes sense when it 
enters into the `while` process. Considering for common cases of small records, 
one `BuilderBuilder` can hold many serialization 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614390#comment-16614390
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605192
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-  

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614389#comment-16614389
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217605083
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Yes, that makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613603#comment-16613603
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217406052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I guess, I was worried about the same thing as @pnowojski ... the expanded 
method here will actually look like this:
   ```
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
if (bufferBuilders[targetChannel].isPresent()) {
bufferBuilder = 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613604#comment-16613604
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217397453
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
 
 Review comment:
   code duplication: why not use this?
   ```
   emit(record, new int[] { rng.nextInt(numChannels) });
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613602#comment-16613602
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217404839
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, channelSelector.selectChannels(record, 
numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emit(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emit(T record, int[] targetChannels) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyFromSerializerToTargetChannel(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* @param targetChannel
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyFromSerializerToTargetChannel(int targetChannel) 
throws IOException, InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   boolean pruneTriggered = false;
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-  

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613601#comment-16613601
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217415039
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   I meant using
   ```
   final RecordWriter writer = isBroadcastEmit ?
new RecordWriter<>(partitionWriter) :
new RecordWriter<>(partitionWriter, new Broadcast<>());
   ```
   This would also check that `broadcastEmit()` does not rely on a broadcasting 
channel selector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611470#comment-16611470
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-420486898
 
 
   Thanks for that. I will also try best to monitor the performance changes for 
these cases later. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606668#comment-16606668
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-419312269
 
 
   @pnowojski, I updated the codes for your comments and squashed it into the 
last commit. Wish your benchmark results! :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606651#comment-16606651
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215832704
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   I also have not seen throughput improvements for this change, then I will 
revert this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605753#comment-16605753
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611369
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   Rename to `bufferBuilder` (marking type in variable name is not the best 
practice). 
   
   Have you seen throughput improvements by introducing this local variable? If 
not, maybe revert the change?
   
   FYI: When I was writing this code, I didn't see any performance improvement 
(and I was testing this exact change). Removing one extra CPU cache read 
(second `bufferBuilders[targetChannel]` access will either be optimised out or 
it will be a read from CPU caches/registries) usually hardly matters compared 
to taking locks :( 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605751#comment-16605751
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215615035
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, 
channelSelector.selectChannels(record, numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if (copyToTargetBuffers(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emitToTargetChannels(T record, int[] targetChannels) 
throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyToTargetBuffers(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* Copies the intermediate serialization buffer to the BufferBuilder of 
the target channel, also
+* checks to prune the intermediate buffer iif the target BufferBuilder 
is fulfilled and the record
+* is full.
+*
+* @param targetChannel the target channel to get BufferBuilder
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyToTargetBuffers(int targetChannel) throws 
IOException, InterruptedException {
 
 Review comment:
   Third time I'm looking at this PR and third time I had to think for a minute 
what does the this method. I'm always forgetting that `serializer` is a class 
field and that this method copies from it.
   
   Maybe rename to `copyFromSerializerToTargetChannel`? Imo rename would allow 
us to drop most of the java doc and simplify it to just:
```
/**
 * @param targetChannel
 * @return true if the intermediate serialization buffer 
should be pruned
 */
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605750#comment-16605750
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611481
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
bufferBuilders[targetChannel] = Optional.empty();
-   numBytesOut.inc(bufferBuilder.finish());
+   numBytesOut.inc(bufferBuilderOpt.get().finish());
numBuffersOut.inc();
}
}
-   
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   @Nonnull
private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
+   return bufferBuilderOpt.get();
} else {
return requestNewBufferBuilder(targetChannel);
}
}
 
-   @Nonnull
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
checkState(!bufferBuilders[targetChannel].isPresent());
-
BufferBuilder bufferBuilder = 
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
bufferBuilders[targetChannel] = Optional.of(bufferBuilder);

targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
return bufferBuilder;
}
 
private void closeBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   bufferBuilders[targetChannel].get().finish();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   ditto: rename or inline


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605754#comment-16605754
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215613344
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, 
channelSelector.selectChannels(record, numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if (copyToTargetBuffers(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emitToTargetChannels(T record, int[] targetChannels) 
throws IOException, InterruptedException {
 
 Review comment:
   maybe rename just to `emit`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605752#comment-16605752
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611435
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
bufferBuilders[targetChannel] = Optional.empty();
-   numBytesOut.inc(bufferBuilder.finish());
+   numBytesOut.inc(bufferBuilderOpt.get().finish());
numBuffersOut.inc();
}
}
-   
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   @Nonnull
private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   ditto: rename or inline


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598589#comment-16598589
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-417629346
 
 
   @NicoK @pnowojski FYI, I have updated the codes covering the above comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597293#comment-16597293
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW edited a comment on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-417269629
 
 
   @pnowojski  Thank you for reminding that the 
`tryFinishCurrentBufferBuilder()` will also be called by `broadcastEvent()`, so 
I prefer keeping the current mode. :)
   
   I indeed run the benchmark in my local machine and it is actually not very 
stable sometimes.
   I will try to update the latest codes covering all the above modifications 
during weekends. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597292#comment-16597292
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-417269629
 
 
   Thank you for reminding that the `tryFinishCurrentBufferBuilder()` will also 
be called by `broadcastEvent()`, so I prefer keeping the current mode. :)
   
   I indeed run the benchmark in my local machine and it is actually not very 
stable sometimes.
   I will try to update the latest codes covering all the above modifications 
during weekends. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597206#comment-16597206
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213949310
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   When I was reviewing this part last time, I was only concerned that both 
`getBufferBuilder(targetChannel);` and 
`tryFinishCurrentBufferBuilder(targetChannel);` are accessing 
`bufferBuilders[targetChannel]` twice. However performance penalty (if any) 
shouldn't be important and I liked the new code more :)
   
   Regarding `finishCurrentBufferBuilder`. `tryFinishCurrentBufferBuilder` is 
currently used also in one more place (`broadcastEvent()`), so we would need to 
add there an `if` check preceding a call 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595131#comment-16595131
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416629286
 
 
   Thanks for your reviews @NicoK 
   
   Sorry for the late updates with this PR because I am a little busy recently, 
also regarding with the benchmark results.
   
   For my own broadcast benchmark, this changes gain obvious improvement. But 
for non-broadcast cases, the throughput of 
`StreamNetworkThroughputBenchmarkExecutor` seems a bit decreased than before. 
After I adjusted to keep the same process of `pruneBuffer()` as before, the 
results seem a bit better than current, but still has a bit decrease (1% 
sometimes) than before. So I guess another reason is in the past the 
`RecordSerializer` will maintain the `BufferBuilder` internally and keep 
copying multi serialization results until full. But now for each record we have 
to get the `BufferBuilder` from the arrays in `RecordWriter` then pass it to 
the `RecordSerializer`. And this is the key difference and overhead because the 
`RecordSerializer` is stateless. So I am still trying to improve other parts to 
compensate this loss.
   
   I am trying to update this PR soon based on all the above comments!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595110#comment-16595110
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213343581
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   You pointed out a good question!
   1. Considering `tryFinishCurrentBufferBuilder()`, the logic is somewhat 
different from before. In the past, the buffer builder may be empty when 
calling `tryFinishCurrentBufferBuilder()`, then it returns a boolean value to 
indicate the result. But now, we know the buffer builder is always present when 
calling `tryFinishCurrentBufferBuilder`, so we may change it to 
`finishCurrentBufferBuilder()` seems more appropriate. And adds the check code 
instead as following:
 ```
   private 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595115#comment-16595115
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213345034
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int 
segmentSize) throws Exception {
 
// 
-
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+   BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
int numBytes = 0;
for (SerializationTestType record : records) {
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
+   serializer.serializeRecord(record);
+   RecordSerializer.SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
numBytes += record.length() + serializationOverhead;
 
if (numBytes < segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
numBytes = 0;
} else {

Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
while (result.isFullBuffer()) {
numBytes -= segmentSize;
-   result = 
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
+   result = 
serializer.copyToBufferBuilder(bufferBuilder);
 
 Review comment:
   make sense


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595114#comment-16595114
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213349150
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   Maybe I do not get your point correctly.
   I just want to verify the two different interface methods in the same 
`RecordWriter` instance, that is `RecordWriter#emit()` and 
`RecordWriter#broadcastEmit()` in two separate cases, because these two methods 
are both involved with this serialization improvement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595111#comment-16595111
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213344540
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   good idea


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595113#comment-16595113
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213346998
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   yes, i will try


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595112#comment-16595112
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213352672
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   yes, i just copy the code from `BroadcastPartitioner`, and i will simple 
this code in a hotfix commit later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593710#comment-16593710
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK edited a comment on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```
   
   well, actually, this brings it down to the overhead from 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer#prune`:
   ```
@Override
public void prune() {
serializationBuffer.pruneBuffer();
dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
   ```
   but to be honest, `serializationBuffer.wrapAsByteBuffer();` also just sets 
two position numbers
   -> if we are that sensitive to small changes, we indeed should think about 
optimising this one call to `tryFinishCurrentBufferBuilder()` which I mentioned 
above(?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593696#comment-16593696
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083
 
 
   I don't quite get the argument with the `pruneBuffer()` call having this 
much overhead...at least for small records since its implementation only prunes 
if the buffer is larger than 5MiB:
   ```
public void pruneBuffer() {
if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
   ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593688#comment-16593688
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970682
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593681#comment-16593681
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212951771
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   actually, here, we do not only know that the buffer builder is present, we 
also already have its reference (in contrast to 
`tryFinishCurrentBufferBuilder()`) and don't need to update the 
`bufferBuilders` field until after the `while` loop - I'm not sure whether this 
is worth optimising, though (@pnowojski?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593686#comment-16593686
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969997
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
+   serializer.copyToBufferBuilder(bufferBuilder2);
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593683#comment-16593683
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212980692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   Actually, this is a copy of 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner` which is 
in the `flink-streaming-java` submodule, though.
   And in general it is good to cache this rather than building a new array for 
every record...
   
   Using `returnChannel.length == numberOfOutputChannels` makes sense though - 
@zhijiangW can you also create a hotfix commit changing this in 
`org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593687#comment-16593687
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212970648
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -106,17 +102,19 @@ public boolean equals(Object obj) {
}
};
 
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
+   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.copyToBufferBuilder(bufferBuilder1));
 
-   result = serializer.addRecord(emptyRecord);
-   
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
+   serializer.serializeRecord(emptyRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593680#comment-16593680
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212974745
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
 
 Review comment:
   The `RecordWriter` instance should be different depending on 
`isBroadcastEmit` to really separate these two cases?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593682#comment-16593682
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212969980
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -40,25 +41,25 @@
 
@Test
public void testHasSerializedData() throws IOException {
-   final int segmentSize = 16;
-
final SpanningRecordSerializer 
serializer = new SpanningRecordSerializer<>();
final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
Assert.assertFalse(serializer.hasSerializedData());
 
-   serializer.addRecord(randomIntRecord);
+   serializer.serializeRecord(randomIntRecord);
Assert.assertTrue(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
+   serializer.copyToBufferBuilder(bufferBuilder1);
Assert.assertFalse(serializer.hasSerializedData());
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8));
-
-   serializer.addRecord(randomIntRecord);
+   final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
+   serializer.serializeRecord(randomIntRecord);
 
 Review comment:
   why don't you use `serializer.reset()` here? (serialize only once as in 
production code)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593685#comment-16593685
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212972706
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 ##
 @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int 
segmentSize) throws Exception {
 
// 
-
 
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
-
+   BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
int numBytes = 0;
for (SerializationTestType record : records) {
-   RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
+   serializer.serializeRecord(record);
+   RecordSerializer.SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
numBytes += record.length() + serializationOverhead;
 
if (numBytes < segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {

Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-   
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
numBytes = 0;
} else {

Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
while (result.isFullBuffer()) {
numBytes -= segmentSize;
-   result = 
serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize));
+   bufferBuilder = 
createBufferBuilder(segmentSize);
+   result = 
serializer.copyToBufferBuilder(bufferBuilder);
 
 Review comment:
   I know, this wasn't checked before, but should we actually also check for a 
full record after this `while` loop?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593684#comment-16593684
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r212952611
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   I was a bit skeptical about the removal of the return value of 
`tryFinishCurrentBufferBuilder()` at first, but I don't see a reason to tie 
breaking out of the loop from full records to actually having a buffer builder 
present - once we completed writing the complete record, we can break out.
   -> therefore it is ok and probably better than before


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582304#comment-16582304
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-413491900
 
 
   Nice find. I missed that :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582193#comment-16582193
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-413468551
 
 
   @pnowojski , I think I got the reason of regression in some non-broadcast 
cases. 
   
   The key point is when to call `RecordSerializer#prune()` which is used to 
decrease the intermediate serialization buffer. In the past, the prune method 
was only called after the target copying buffer is full and the record is full. 
But now after emitting each record we will call to prune method in order to 
narrow down the intermediate buffer ASAP. So the performance may be regression 
in sensitive job scenarios. I will modify to keep the previous behavior of 
calling prune in serializer.
   
   Thanks for your benchmark to find this potential issue. I only verified the 
broadcast scenarios in benchmark before and the obvious advantage may hide the 
potential regression. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580936#comment-16580936
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-413167382
 
 
   @zhijiangW yes, those results from our internal benchmarking tool are a 
little bit strange since I do not see any obvious place that could cause them. 
However the regression was visible across the board of various cases (none of 
them used broadcasting extensively) and so far we haven't seen such large false 
positive error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580932#comment-16580932
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r210236329
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   So maybe keep it as it is. Maybe in the future someone will be struck by a 
better idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580829#comment-16580829
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW edited a comment on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-413127511
 
 
   For benchmark, I also create a new job with only source and map vertex, and 
the source broadcast Long type value to all the maps. I execute the same job 
for setting the parallelism 1:100 and 1:200 separately, and the throughput 
increases 13%, 15% separately in the  following:
   
   ```
   parallelism  | throughput (before)  | throughput (now)
   1:100| 70.760 ± 10.557 ops/ms   | 83.480 ± 1.967 ops/ms
   1:200| 37.756 ± 1.170 ops/ms| 43.316 ± 2.176 ops/ms
   ```
   
   In theory, we only reduce the number of serialization times and do not 
introduce any time-cost operations.  I will further verify the 
`StreamNetworkThroughputBenchmarkExecutor` as you mentioned and show your 
results after done. :)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580825#comment-16580825
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-413127511
 
 
   For benchmark, I also create a new job with only source and map vertex, and 
the source broadcast Long type value to all the maps. I execute the same job 
for setting the parallelism 1:100 and 1:200 separately, and the throughput 
increases 13%, 15% separately in the following:
   parallelism| throughput(before)  | throughput(now)
   1:100| 70.760 ± 10.557 ops/ms | 83.480 ± 1.967 ops/ms
   1:200| 37.756 ± 1.170 ops/ms   | 43.316 ± 2.176 ops/ms
   
   In theory, we only reduce the number of serialization times and do not 
introduce any time-cost operations.  I will further verify the 
`StreamNetworkThroughputBenchmarkExecutor` as you mentioned and show your 
results after done. :)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580814#comment-16580814
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r210195493
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I agree with not exposing `RecordSerializer`'s internal private fields to 
`SerializedRecord` for safe concern.
   
   Regarding with the way of reference of the `RecordSerializer` directly in 
`SerializedRecord`. The whole process in `RecordWriter` may be like this:
   
SerializedRecord  serializedRecord = 
serializer.serializeRecord(record);
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
   //reset position for multiple copying
   serializer.reset();
BufferBuilder bufferBuilder = 
getBufferBuilder(targetChannel);
CopyingResult result = 
serializedRecord.copyToBufferBuilder(bufferBuilder);
}
serializer.prune();
   
   The `SerializedRecord` only provides the method of `copyToBufferBuilder` and 
wraps the internal class `CopyingResult`, but the actual copying operation is 
still done in `RecordSerializer`. So the `RecordSerializer` also needs provide 
the method `copyToBufferBuilder`. From this point, we actually do not separate 
the two steps of `serialization` and `copy` from `RecordSerializer`. So I think 
maybe it is not very worth doing that. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579907#comment-16579907
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643
 
 
   One possible bad news. I have run benchmarks as defined in 
https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot 
of them have shown performance regression. The worst was 
`StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression 
~18%.
   
   Could you run those benchmarks locally and confirm if that's the case or not?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579689#comment-16579689
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209921447
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   Re 1.
   
   I was thinking about something along those lines, however I saw the need for 
`close()`, because currently `SpanningRecordSerializer` reuses for multiple 
records `byte[]` that hides in `dataBuffer = 
serializationBuffer.wrapAsByteBuffer();` call. Is there some other way to 
release/return this array back to `RecordSerializer`? Or maybe that proves that 
`RecordSerializer` and `SerializedRecord` should be indeed in one class?
   
   Re 2. 
   
   I'm not sure. This one seems like it would leak `RecordSerializer 
serializer` private fields to `SerializedRecord`.
   
   All in all, I'm not sure if the improvement here is worth the effort and 
even if we would improve/simplify code here (especially this `close()` bothers 
me). Also I agree that your changes in `addRecord`, 
`continueWritingWithNextBufferBuilder` and `copyToBufferBuilder` makes this 
code actually easier all in all, so I would be fine with merging it as is 
(modulo my other comments) :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579665#comment-16579665
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`SpanningRecordSerializationTest#testSerializationRoundTrip(Iterable,
 int, RecordSerializer, 
RecordDeserializer)`? They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578048#comment-16578048
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-412469613
 
 
   @pnowojski , thanks for your reviews!
   I basically agree with your idea of separating current `RecordSerializer` 
further, but consider specific implementation, there are still some issues to 
be confirmed. After we reach the agreement, I will continue with the test 
issues. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578040#comment-16578040
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209551509
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   The previous `RecordSerializer` also confuses me a lot and I have the same 
experience with you, because the previous `addRecord` and 
`continueWritingWithNextBufferBuilder`  methods can be called in arbitrary 
sequence and both returned `SerializationResult`.
   
   In my current reconstruction, the method `serializeRecord` must be called 
firstly, and then the method `copyToBufferBuilder` is called to return the 
final `SerializationResult`. I think it seems a bit clearer than before.
   
   I agree your above idea is good for separating these two methods further. 
But the `RecordSerializer` and `SerializedRecord` may be still close with each 
other. I think there are two ways to realize 
`SerializedRecord#copyToBufferBuilder`:
   1. 
   ```
  public SerializedRecord(ByteBuffer lengthBuffer, ByteBuffer dataBuffer) {
}
   
   CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) {
   // copy  lengthBuffer
  // copy dataBuffer
 //  get CopingResult
}
   ```
   
   So this way the `SerializedRecord` can only see `lengthBuffer` and 
`dataBuffer`, and can not interact with `RecordSerializer`. Maybe we do not 
need do anything in `SerializedRecord#close()`.
   
   2. 
 ```
public SerializedRecord(RecordSerializer serializer) {
  }
   
   CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) {
   serializer.copyToBufferBuilder();
   //  get CopingResult
}
   ```
   
   This way the `SerializedRecord` can see and interact with 
`RecordSerializer`, but the only difference seems we separate the 
`SerializedRecord` and `CopyingResult`. And my current implementation is we 
hide the `SerializedRecord` and return `SerializationResult` which corresponds 
to `CopyingResult` as final result.
   
   What do you think of the above ways?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576424#comment-16576424
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209284512
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 
/**
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*
-* @return true if some data were written
 */
-   private boolean tryFinishCurrentBufferBuilder(int targetChannel, 
RecordSerializer serializer) {
-
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return false;
+   private void tryFinishCurrentBufferBuilder(int targetChannel) {
+   if (bufferBuilders[targetChannel].isPresent()) {
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+   numBytesOut.inc(bufferBuilder.finish());
}
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   }
 
-   numBytesOut.inc(bufferBuilder.finish());
-   serializer.clear();
-   return true;
+   /**
+* The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
+* request a new one for this target channel.
+*/
+   @Nonnull
 
 Review comment:
   Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming 
that any non `@Nullable` marked field is automatically `@Nonnull`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576425#comment-16576425
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290889
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I'm thinking about refactoring this class and splitting it into two:
   ```
   class RecordSerializer {
SerializedRecord serializeRecord(T record);
   };
   
   class SerializedRecord implements Autoclosable {
 CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder);
   
 void close() {
serializer.prune();
// and code to return state (serializationBuffer) to serializer for 
reuse
 }
   }
   ```
   
   and usage:
   ```
public void randomEmit(T record) throws IOException, 
InterruptedException {
try (SerializedRecord serializedRecord = 
serializer.serializeRecord(record)) {
copyToTarget(serializedRecord, 
rng.nextInt(numChannels));
}
}
   ```
   
   somehow always was/is tickling my brain in current `RecordSerializer` is 
confusing to me and I have to always check it's implementation whenever I 
revisit the code. Maybe with this split it would be easier to understand? But 
I'm not sure about this. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576422#comment-16576422
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290136
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   1. do we need to cache `returnChannel`? Does it give any meaningful test 
execution speed up?
   2. if so, instead of using `set` and `setNumber`, just check whether 
`returnChannel.length == numberOfOutputChannels`. If not, create new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576423#comment-16576423
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable,
 int, 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer,
 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`?
 They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-07-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555886#comment-16555886
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-407807509
 
 
   @pnowojski , I have submitted the codes for serialization improvement as we 
confirmed before.
   Wish your review if have time. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-07-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555487#comment-16555487
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/6417

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter

## What is the purpose of the change

*This pull request improves the output serialization only once for multi 
target channels in `RecordWriter`, rather than serialization as many times as 
the number of selected channels.

## Brief change log

  - *Only one `RecordSerializer` is created for all the output channels in 
`RecordWriter`*
  - *Restructure the processes of `emit`, `broadcastEmit`, randomEmit` in 
`RecordWriter`*
  - *Restructure the interface methods in `RecordSerializer`*

## Verifying this change

This change is already covered by existing tests, such as 
*SpanningRecordSerializationTest*, etc.

And adds new tests in `RecordWriterTest` to verify:

  - *The serialization results are correct by `RecordWriter#emit` with 
`BroadcastPartitioner`*
  - *The serialization results are correct by `RecordWriter#broadcastEmit` 
directly*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-9913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6417


commit 109ddb37abafcea28478b90cda10b965e0c399d5
Author: Zhijiang 
Date:   2018-07-25T05:45:23Z

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter




> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)