[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623432#comment-16623432 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 25d292771d0..6eebbbe88eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; /** -* Sets a (next) target buffer to use and continues writing remaining data -* to it until it is full. +* Copies the intermediate data serialization buffer to the given target buffer. * * @param bufferBuilder the new target buffer to use * @return how much information was written to the target buffer and * whether this buffer is full */ - SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; + SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder); + + /** +* Checks to decrease the size of intermediate data serialization buffer after finishing the +* whole serialization process including {@link #serializeRecord(IOReadableWritable)} and +* {@link #copyToBufferBuilder(BufferBuilder)}. +*/ + void prune(); /** -* Clear and release internal state. +* Supports copying an intermediate data serialization buffer to multiple target buffers +* by resetting its initial position before each copying. */ - void clear(); + void reset(); /** * @return true if has some serialized data pending copying to the result {@link BufferBuilder}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index c4ab53f4b3a..ba2ed0133fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -20,11 +20,8 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import javax.annotation.Nullable; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -32,7 +29,7 @@ /** * Record serializer which serializes the complete record to an intermediate * data serialization buffer and copies this buffer to target buffers - * one-by-one using {@link #continueWritingWithNextBufferBuilder(BufferBuilder)}. + * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}. * * @param The type of the records that are serialized. */ @@ -50,10 +47,6 @@ /** Intermediate buffer for length serialization. */ private final ByteBuffer lengthBuffer; - /** Current target {@link Buffer} of the serializer. */ - @Nullable - private BufferBuilder targetBuffer; - public SpanningRecordSerializer() { serializationBuffer = new DataOutputSerializer(128); @@ -66,15 +59,12 @@ public SpanningRecordSerializer() { } /** -* Serializes the complete record to an intermediate data serialization -* buffer and starts copying it to the target buffer (if available). +* Serializes
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623429#comment-16623429 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421 @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for the contribution and the time spent on the feature/PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614627#comment-16614627 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-421305505 @pnowojski , I have squashed the commits for your just concerns! :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614621#comment-16614621 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217663737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: The calling from `getBufferBuilder` does not need this `checkState`. But for calling from `copyFromSerializerToTargetChannel`, it may be necessary to add this check for avoiding bugs. I removed it just for reducing some overheads. I will restore this check. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614604#comment-16614604 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217661048 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - serializer.serializeRecord(record); - - if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { - serializer.prune(); - } + emit(record, new int[] { rng.nextInt(numChannels) }); Review comment: got it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614582#comment-16614582 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217655518 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: This `checkState` was preventing from some bugs and data loses. Maybe replace with: ``` checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].isFinished()); ``` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614577#comment-16614577 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217654112 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - serializer.serializeRecord(record); - - if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { - serializer.prune(); - } + emit(record, new int[] { rng.nextInt(numChannels) }); Review comment: Wait a minute here. ``` emit(record, new int[] { rng.nextInt(numChannels) }); ``` @NicoK is an actual performance regression. Creating a single element int array (once per every record!) actually reduces the throughput by about 10%-20% (tested during one of the hackathons). Please revert to using `copyFromSerializerToTargetChannel(rng.nextInt(numChannels))` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614578#comment-16614578 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217655518 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: This `checkState` was preventing from some bugs and data loses. Maybe replace with: `checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].isFinished());`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614469#comment-16614469 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-421255951 @NicoK, I updates the codes covering your new comments. BTW, the travis fails irrelevant with my codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614464#comment-16614464 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217619644 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: Thanks for this further suggestion! I agree with the idea of making the logic simple in the loop part and reduce the overhead related with the `BufferBuilder` array. I adjust the process a bit different with above codes. I think `bufferBuilders[targetChannel] = Optional.ofNullable(bufferBuilder)` do not need to be called every time during copy, because it only makes sense when it enters into the `while` process. Considering for common cases of small records, one `BuilderBuilder` can hold many serialization
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614390#comment-16614390 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605192 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). -
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614389#comment-16614389 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217605083 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Yes, that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613603#comment-16613603 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217406052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: I guess, I was worried about the same thing as @pnowojski ... the expanded method here will actually look like this: ``` boolean pruneTriggered = false; BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { if (bufferBuilders[targetChannel].isPresent()) { bufferBuilder =
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613604#comment-16613604 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217397453 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } Review comment: code duplication: why not use this? ``` emit(record, new int[] { rng.nextInt(numChannels) }); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613602#comment-16613602 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217404839 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,95 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emit(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emit(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyFromSerializerToTargetChannel(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* @param targetChannel +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + boolean pruneTriggered = false; + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). -
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613601#comment-16613601 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217415039 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: I meant using ``` final RecordWriter writer = isBroadcastEmit ? new RecordWriter<>(partitionWriter) : new RecordWriter<>(partitionWriter, new Broadcast<>()); ``` This would also check that `broadcastEmit()` does not rely on a broadcasting channel selector. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611470#comment-16611470 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-420486898 Thanks for that. I will also try best to monitor the performance changes for these cases later. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606668#comment-16606668 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-419312269 @pnowojski, I updated the codes for your comments and squashed it into the last commit. Wish your benchmark results! :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16606651#comment-16606651 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215832704 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: I also have not seen throughput improvements for this change, then I will revert this change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605753#comment-16605753 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611369 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: Rename to `bufferBuilder` (marking type in variable name is not the best practice). Have you seen throughput improvements by introducing this local variable? If not, maybe revert the change? FYI: When I was writing this code, I didn't see any performance improvement (and I was testing this exact change). Removing one extra CPU cache read (second `bufferBuilders[targetChannel]` access will either be optimised out or it will be a read from CPU caches/registries) usually hardly matters compared to taking locks :( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605751#comment-16605751 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215615035 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyToTargetBuffers(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emitToTargetChannels(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyToTargetBuffers(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* Copies the intermediate serialization buffer to the BufferBuilder of the target channel, also +* checks to prune the intermediate buffer iif the target BufferBuilder is fulfilled and the record +* is full. +* +* @param targetChannel the target channel to get BufferBuilder +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyToTargetBuffers(int targetChannel) throws IOException, InterruptedException { Review comment: Third time I'm looking at this PR and third time I had to think for a minute what does the this method. I'm always forgetting that `serializer` is a class field and that this method copies from it. Maybe rename to `copyFromSerializerToTargetChannel`? Imo rename would allow us to drop most of the java doc and simplify it to just: ``` /** * @param targetChannel * @return true if the intermediate serialization buffer should be pruned */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605750#comment-16605750 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611481 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { bufferBuilders[targetChannel] = Optional.empty(); - numBytesOut.inc(bufferBuilder.finish()); + numBytesOut.inc(bufferBuilderOpt.get().finish()); numBuffersOut.inc(); } } - + /** * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need * request a new one for this target channel. */ - @Nonnull private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel].isPresent()) { - return bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { + return bufferBuilderOpt.get(); } else { return requestNewBufferBuilder(targetChannel); } } - @Nonnull private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { checkState(!bufferBuilders[targetChannel].isPresent()); - BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking(); bufferBuilders[targetChannel] = Optional.of(bufferBuilder); targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel); return bufferBuilder; } private void closeBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - bufferBuilders[targetChannel].get().finish(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: ditto: rename or inline This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605754#comment-16605754 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215613344 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyToTargetBuffers(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emitToTargetChannels(T record, int[] targetChannels) throws IOException, InterruptedException { Review comment: maybe rename just to `emit`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605752#comment-16605752 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611435 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { bufferBuilders[targetChannel] = Optional.empty(); - numBytesOut.inc(bufferBuilder.finish()); + numBytesOut.inc(bufferBuilderOpt.get().finish()); numBuffersOut.inc(); } } - + /** * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need * request a new one for this target channel. */ - @Nonnull private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel].isPresent()) { - return bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: ditto: rename or inline This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598589#comment-16598589 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-417629346 @NicoK @pnowojski FYI, I have updated the codes covering the above comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597293#comment-16597293 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW edited a comment on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-417269629 @pnowojski Thank you for reminding that the `tryFinishCurrentBufferBuilder()` will also be called by `broadcastEvent()`, so I prefer keeping the current mode. :) I indeed run the benchmark in my local machine and it is actually not very stable sometimes. I will try to update the latest codes covering all the above modifications during weekends. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597292#comment-16597292 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-417269629 Thank you for reminding that the `tryFinishCurrentBufferBuilder()` will also be called by `broadcastEvent()`, so I prefer keeping the current mode. :) I indeed run the benchmark in my local machine and it is actually not very stable sometimes. I will try to update the latest codes covering all the above modifications during weekends. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597206#comment-16597206 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213949310 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: When I was reviewing this part last time, I was only concerned that both `getBufferBuilder(targetChannel);` and `tryFinishCurrentBufferBuilder(targetChannel);` are accessing `bufferBuilders[targetChannel]` twice. However performance penalty (if any) shouldn't be important and I liked the new code more :) Regarding `finishCurrentBufferBuilder`. `tryFinishCurrentBufferBuilder` is currently used also in one more place (`broadcastEvent()`), so we would need to add there an `if` check preceding a call
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595131#comment-16595131 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-416629286 Thanks for your reviews @NicoK Sorry for the late updates with this PR because I am a little busy recently, also regarding with the benchmark results. For my own broadcast benchmark, this changes gain obvious improvement. But for non-broadcast cases, the throughput of `StreamNetworkThroughputBenchmarkExecutor` seems a bit decreased than before. After I adjusted to keep the same process of `pruneBuffer()` as before, the results seem a bit better than current, but still has a bit decrease (1% sometimes) than before. So I guess another reason is in the past the `RecordSerializer` will maintain the `BufferBuilder` internally and keep copying multi serialization results until full. But now for each record we have to get the `BufferBuilder` from the arrays in `RecordWriter` then pass it to the `RecordSerializer`. And this is the key difference and overhead because the `RecordSerializer` is stateless. So I am still trying to improve other parts to compensate this loss. I am trying to update this PR soon based on all the above comments! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595110#comment-16595110 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213343581 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: You pointed out a good question! 1. Considering `tryFinishCurrentBufferBuilder()`, the logic is somewhat different from before. In the past, the buffer builder may be empty when calling `tryFinishCurrentBufferBuilder()`, then it returns a boolean value to indicate the result. But now, we know the buffer builder is always present when calling `tryFinishCurrentBufferBuilder`, so we may change it to `finishCurrentBufferBuilder()` seems more appropriate. And adds the check code instead as following: ``` private
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595115#comment-16595115 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213345034 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int segmentSize) throws Exception { // - - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); - + BufferBuilder bufferBuilder = createBufferBuilder(segmentSize); int numBytes = 0; for (SerializationTestType record : records) { - RecordSerializer.SerializationResult result = serializer.addRecord(record); + serializer.serializeRecord(record); + RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); numBytes += record.length() + serializationOverhead; if (numBytes < segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); numBytes = 0; } else { Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); + result = serializer.copyToBufferBuilder(bufferBuilder); Review comment: make sense This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595114#comment-16595114 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213349150 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: Maybe I do not get your point correctly. I just want to verify the two different interface methods in the same `RecordWriter` instance, that is `RecordWriter#emit()` and `RecordWriter#broadcastEmit()` in two separate cases, because these two methods are both involved with this serialization improvement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595111#comment-16595111 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213344540 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -40,25 +41,25 @@ @Test public void testHasSerializedData() throws IOException { - final int segmentSize = 16; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasSerializedData()); - serializer.addRecord(randomIntRecord); + serializer.serializeRecord(randomIntRecord); Assert.assertTrue(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + final BufferBuilder bufferBuilder1 = createBufferBuilder(16); + serializer.copyToBufferBuilder(bufferBuilder1); Assert.assertFalse(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8)); - - serializer.addRecord(randomIntRecord); + final BufferBuilder bufferBuilder2 = createBufferBuilder(8); + serializer.serializeRecord(randomIntRecord); Review comment: good idea This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595113#comment-16595113 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213346998 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: yes, i will try This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}}
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16595112#comment-16595112 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213352672 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: yes, i just copy the code from `BroadcastPartitioner`, and i will simple this code in a hotfix commit later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. > An additional benefit by using a single serializer for all channels is that > we get a potentially significant reduction on heap space overhead from fewer > intermediate serialization buffers (only once we got over 5MiB, these buffers > were pruned back to 128B!). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593710#comment-16593710 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK edited a comment on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083 I don't quite get the argument with the `pruneBuffer()` call having this much overhead...at least for small records since its implementation only prunes if the buffer is larger than 5MiB: ``` public void pruneBuffer() { if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { ... ``` well, actually, this brings it down to the overhead from `org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer#prune`: ``` @Override public void prune() { serializationBuffer.pruneBuffer(); dataBuffer = serializationBuffer.wrapAsByteBuffer(); } ``` but to be honest, `serializationBuffer.wrapAsByteBuffer();` also just sets two position numbers -> if we are that sensitive to small changes, we indeed should think about optimising this one call to `tryFinishCurrentBufferBuilder()` which I mentioned above(?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593696#comment-16593696 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-416236083 I don't quite get the argument with the `pruneBuffer()` call having this much overhead...at least for small records since its implementation only prunes if the buffer is larger than 5MiB: ``` public void pruneBuffer() { if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593688#comment-16593688 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212970682 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -106,17 +102,19 @@ public boolean equals(Object obj) { } }; - RecordSerializer.SerializationResult result = serializer.addRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); + serializer.serializeRecord(emptyRecord); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1)); - result = serializer.addRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); + serializer.serializeRecord(emptyRecord); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1)); - result = serializer.addRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); + serializer.serializeRecord(emptyRecord); Review comment: why don't you use `serializer.reset()` here? (serialize only once as in production code) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593681#comment-16593681 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212951771 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: actually, here, we do not only know that the buffer builder is present, we also already have its reference (in contrast to `tryFinishCurrentBufferBuilder()`) and don't need to update the `bufferBuilders` field until after the `while` loop - I'm not sure whether this is worth optimising, though (@pnowojski?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593686#comment-16593686 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212969997 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -40,25 +41,25 @@ @Test public void testHasSerializedData() throws IOException { - final int segmentSize = 16; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasSerializedData()); - serializer.addRecord(randomIntRecord); + serializer.serializeRecord(randomIntRecord); Assert.assertTrue(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + final BufferBuilder bufferBuilder1 = createBufferBuilder(16); + serializer.copyToBufferBuilder(bufferBuilder1); Assert.assertFalse(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8)); - - serializer.addRecord(randomIntRecord); + final BufferBuilder bufferBuilder2 = createBufferBuilder(8); + serializer.serializeRecord(randomIntRecord); + serializer.copyToBufferBuilder(bufferBuilder2); Assert.assertFalse(serializer.hasSerializedData()); - serializer.addRecord(randomIntRecord); + serializer.serializeRecord(randomIntRecord); Review comment: why don't you use `serializer.reset()` here? (serialize only once as in production code) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593683#comment-16593683 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212980692 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: Actually, this is a copy of `org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner` which is in the `flink-streaming-java` submodule, though. And in general it is good to cache this rather than building a new array for every record... Using `returnChannel.length == numberOfOutputChannels` makes sense though - @zhijiangW can you also create a hotfix commit changing this in `org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593687#comment-16593687 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212970648 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -106,17 +102,19 @@ public boolean equals(Object obj) { } }; - RecordSerializer.SerializationResult result = serializer.addRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); + serializer.serializeRecord(emptyRecord); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1)); - result = serializer.addRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); + serializer.serializeRecord(emptyRecord); Review comment: why don't you use `serializer.reset()` here? (serialize only once as in production code) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593680#comment-16593680 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212974745 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); Review comment: The `RecordWriter` instance should be different depending on `isBroadcastEmit` to really separate these two cases? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593682#comment-16593682 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212969980 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -40,25 +41,25 @@ @Test public void testHasSerializedData() throws IOException { - final int segmentSize = 16; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasSerializedData()); - serializer.addRecord(randomIntRecord); + serializer.serializeRecord(randomIntRecord); Assert.assertTrue(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + final BufferBuilder bufferBuilder1 = createBufferBuilder(16); + serializer.copyToBufferBuilder(bufferBuilder1); Assert.assertFalse(serializer.hasSerializedData()); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(8)); - - serializer.addRecord(randomIntRecord); + final BufferBuilder bufferBuilder2 = createBufferBuilder(8); + serializer.serializeRecord(randomIntRecord); Review comment: why don't you use `serializer.reset()` here? (serialize only once as in production code) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593685#comment-16593685 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212972706 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ## @@ -169,25 +167,26 @@ private void test(Util.MockRecords records, int segmentSize) throws Exception { // - - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); - + BufferBuilder bufferBuilder = createBufferBuilder(segmentSize); int numBytes = 0; for (SerializationTestType record : records) { - RecordSerializer.SerializationResult result = serializer.addRecord(record); + serializer.serializeRecord(record); + RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); numBytes += record.length() + serializationOverhead; if (numBytes < segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); numBytes = 0; } else { Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.continueWritingWithNextBufferBuilder(createBufferBuilder(segmentSize)); + bufferBuilder = createBufferBuilder(segmentSize); + result = serializer.copyToBufferBuilder(bufferBuilder); Review comment: I know, this wasn't checked before, but should we actually also check for a full record after this `while` loop? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593684#comment-16593684 ] ASF GitHub Bot commented on FLINK-9913: --- NicoK commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r212952611 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: I was a bit skeptical about the removal of the return value of `tryFinishCurrentBufferBuilder()` at first, but I don't see a reason to tie breaking out of the loop from full records to actually having a buffer builder present - once we completed writing the complete record, we can break out. -> therefore it is ok and probably better than before This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582304#comment-16582304 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-413491900 Nice find. I missed that :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582193#comment-16582193 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-413468551 @pnowojski , I think I got the reason of regression in some non-broadcast cases. The key point is when to call `RecordSerializer#prune()` which is used to decrease the intermediate serialization buffer. In the past, the prune method was only called after the target copying buffer is full and the record is full. But now after emitting each record we will call to prune method in order to narrow down the intermediate buffer ASAP. So the performance may be regression in sensitive job scenarios. I will modify to keep the previous behavior of calling prune in serializer. Thanks for your benchmark to find this potential issue. I only verified the broadcast scenarios in benchmark before and the obvious advantage may hide the potential regression. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580936#comment-16580936 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-413167382 @zhijiangW yes, those results from our internal benchmarking tool are a little bit strange since I do not see any obvious place that could cause them. However the regression was visible across the board of various cases (none of them used broadcasting extensively) and so far we haven't seen such large false positive error. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580932#comment-16580932 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r210236329 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: So maybe keep it as it is. Maybe in the future someone will be struck by a better idea. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580829#comment-16580829 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW edited a comment on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-413127511 For benchmark, I also create a new job with only source and map vertex, and the source broadcast Long type value to all the maps. I execute the same job for setting the parallelism 1:100 and 1:200 separately, and the throughput increases 13%, 15% separately in the following: ``` parallelism | throughput (before) | throughput (now) 1:100| 70.760 ± 10.557 ops/ms | 83.480 ± 1.967 ops/ms 1:200| 37.756 ± 1.170 ops/ms| 43.316 ± 2.176 ops/ms ``` In theory, we only reduce the number of serialization times and do not introduce any time-cost operations. I will further verify the `StreamNetworkThroughputBenchmarkExecutor` as you mentioned and show your results after done. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580825#comment-16580825 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-413127511 For benchmark, I also create a new job with only source and map vertex, and the source broadcast Long type value to all the maps. I execute the same job for setting the parallelism 1:100 and 1:200 separately, and the throughput increases 13%, 15% separately in the following: parallelism| throughput(before) | throughput(now) 1:100| 70.760 ± 10.557 ops/ms | 83.480 ± 1.967 ops/ms 1:200| 37.756 ± 1.170 ops/ms | 43.316 ± 2.176 ops/ms In theory, we only reduce the number of serialization times and do not introduce any time-cost operations. I will further verify the `StreamNetworkThroughputBenchmarkExecutor` as you mentioned and show your results after done. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580814#comment-16580814 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r210195493 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I agree with not exposing `RecordSerializer`'s internal private fields to `SerializedRecord` for safe concern. Regarding with the way of reference of the `RecordSerializer` directly in `SerializedRecord`. The whole process in `RecordWriter` may be like this: SerializedRecord serializedRecord = serializer.serializeRecord(record); for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { //reset position for multiple copying serializer.reset(); BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); CopyingResult result = serializedRecord.copyToBufferBuilder(bufferBuilder); } serializer.prune(); The `SerializedRecord` only provides the method of `copyToBufferBuilder` and wraps the internal class `CopyingResult`, but the actual copying operation is still done in `RecordSerializer`. So the `RecordSerializer` also needs provide the method `copyToBufferBuilder`. From this point, we actually do not separate the two steps of `serialization` and `copy` from `RecordSerializer`. So I think maybe it is not very worth doing that. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579907#comment-16579907 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643 One possible bad news. I have run benchmarks as defined in https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot of them have shown performance regression. The worst was `StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression ~18%. Could you run those benchmarks locally and confirm if that's the case or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579689#comment-16579689 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209921447 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: Re 1. I was thinking about something along those lines, however I saw the need for `close()`, because currently `SpanningRecordSerializer` reuses for multiple records `byte[]` that hides in `dataBuffer = serializationBuffer.wrapAsByteBuffer();` call. Is there some other way to release/return this array back to `RecordSerializer`? Or maybe that proves that `RecordSerializer` and `SerializedRecord` should be indeed in one class? Re 2. I'm not sure. This one seems like it would leak `RecordSerializer serializer` private fields to `SerializedRecord`. All in all, I'm not sure if the improvement here is worth the effort and even if we would improve/simplify code here (especially this `close()` bothers me). Also I agree that your changes in `addRecord`, `continueWritingWithNextBufferBuilder` and `copyToBufferBuilder` makes this code actually easier all in all, so I would be fine with merging it as is (modulo my other comments) :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579665#comment-16579665 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `SpanningRecordSerializationTest#testSerializationRoundTrip(Iterable, int, RecordSerializer, RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578048#comment-16578048 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-412469613 @pnowojski , thanks for your reviews! I basically agree with your idea of separating current `RecordSerializer` further, but consider specific implementation, there are still some issues to be confirmed. After we reach the agreement, I will continue with the test issues. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578040#comment-16578040 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209551509 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: The previous `RecordSerializer` also confuses me a lot and I have the same experience with you, because the previous `addRecord` and `continueWritingWithNextBufferBuilder` methods can be called in arbitrary sequence and both returned `SerializationResult`. In my current reconstruction, the method `serializeRecord` must be called firstly, and then the method `copyToBufferBuilder` is called to return the final `SerializationResult`. I think it seems a bit clearer than before. I agree your above idea is good for separating these two methods further. But the `RecordSerializer` and `SerializedRecord` may be still close with each other. I think there are two ways to realize `SerializedRecord#copyToBufferBuilder`: 1. ``` public SerializedRecord(ByteBuffer lengthBuffer, ByteBuffer dataBuffer) { } CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) { // copy lengthBuffer // copy dataBuffer // get CopingResult } ``` So this way the `SerializedRecord` can only see `lengthBuffer` and `dataBuffer`, and can not interact with `RecordSerializer`. Maybe we do not need do anything in `SerializedRecord#close()`. 2. ``` public SerializedRecord(RecordSerializer serializer) { } CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder) { serializer.copyToBufferBuilder(); // get CopingResult } ``` This way the `SerializedRecord` can see and interact with `RecordSerializer`, but the only difference seems we separate the `SerializedRecord` and `CopyingResult`. And my current implementation is we hide the `SerializedRecord` and return `SerializationResult` which corresponds to `CopyingResult` as final result. What do you think of the above ways? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576424#comment-16576424 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209284512 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { /** * Marks the current {@link BufferBuilder} as finished and clears the state for next one. -* -* @return true if some data were written */ - private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer serializer) { - - if (!bufferBuilders[targetChannel].isPresent()) { - return false; + private void tryFinishCurrentBufferBuilder(int targetChannel) { + if (bufferBuilders[targetChannel].isPresent()) { + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + numBytesOut.inc(bufferBuilder.finish()); } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); - bufferBuilders[targetChannel] = Optional.empty(); + } - numBytesOut.inc(bufferBuilder.finish()); - serializer.clear(); - return true; + /** +* The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need +* request a new one for this target channel. +*/ + @Nonnull Review comment: Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming that any non `@Nullable` marked field is automatically `@Nonnull`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576425#comment-16576425 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290889 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I'm thinking about refactoring this class and splitting it into two: ``` class RecordSerializer { SerializedRecord serializeRecord(T record); }; class SerializedRecord implements Autoclosable { CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder); void close() { serializer.prune(); // and code to return state (serializationBuffer) to serializer for reuse } } ``` and usage: ``` public void randomEmit(T record) throws IOException, InterruptedException { try (SerializedRecord serializedRecord = serializer.serializeRecord(record)) { copyToTarget(serializedRecord, rng.nextInt(numChannels)); } } ``` somehow always was/is tickling my brain in current `RecordSerializer` is confusing to me and I have to always check it's implementation whenever I revisit the code. Maybe with this split it would be easier to understand? But I'm not sure about this. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576422#comment-16576422 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290136 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: 1. do we need to cache `returnChannel`? Does it give any meaningful test execution speed up? 2. if so, instead of using `set` and `setNumber`, just check whether `returnChannel.length == numberOfOutputChannels`. If not, create new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576423#comment-16576423 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable, int, org.apache.flink.runtime.io.network.api.serialization.RecordSerializer, org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555886#comment-16555886 ] ASF GitHub Bot commented on FLINK-9913: --- zhijiangW commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-407807509 @pnowojski , I have submitted the codes for serialization improvement as we confirmed before. Wish your review if have time. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555487#comment-16555487 ] ASF GitHub Bot commented on FLINK-9913: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/6417 [FLINK-9913][runtime] Improve output serialization only once in RecordWriter ## What is the purpose of the change *This pull request improves the output serialization only once for multi target channels in `RecordWriter`, rather than serialization as many times as the number of selected channels. ## Brief change log - *Only one `RecordSerializer` is created for all the output channels in `RecordWriter`* - *Restructure the processes of `emit`, `broadcastEmit`, randomEmit` in `RecordWriter`* - *Restructure the interface methods in `RecordSerializer`* ## Verifying this change This change is already covered by existing tests, such as *SpanningRecordSerializationTest*, etc. And adds new tests in `RecordWriterTest` to verify: - *The serialization results are correct by `RecordWriter#emit` with `BroadcastPartitioner`* - *The serialization results are correct by `RecordWriter#broadcastEmit` directly* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-9913 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6417.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6417 commit 109ddb37abafcea28478b90cda10b965e0c399d5 Author: Zhijiang Date: 2018-07-25T05:45:23Z [FLINK-9913][runtime] Improve output serialization only once in RecordWriter > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)