[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217655518 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: This `checkState` was preventing from some bugs and data loses. Maybe replace with: ``` checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].isFinished()); ``` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217655518 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, In } private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent()); Review comment: This `checkState` was preventing from some bugs and data loses. Maybe replace with: `checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].isFinished());`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r217654112 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - serializer.serializeRecord(record); - - if (copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) { - serializer.prune(); - } + emit(record, new int[] { rng.nextInt(numChannels) }); Review comment: Wait a minute here. ``` emit(record, new int[] { rng.nextInt(numChannels) }); ``` @NicoK is an actual performance regression. Creating a single element int array (once per every record!) actually reduces the throughput by about 10%-20% (tested during one of the hackathons). Please revert to using `copyFromSerializerToTargetChannel(rng.nextInt(numChannels))` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215613344 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyToTargetBuffers(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emitToTargetChannels(T record, int[] targetChannels) throws IOException, InterruptedException { Review comment: maybe rename just to `emit`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611369 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: Rename to `bufferBuilder` (marking type in variable name is not the best practice). Have you seen throughput improvements by introducing this local variable? If not, maybe revert the change? FYI: When I was writing this code, I didn't see any performance improvement (and I was testing this exact change). Removing one extra CPU cache read (second `bufferBuilders[targetChannel]` access will either be optimised out or it will be a read from CPU caches/registries) usually hardly matters compared to taking locks :( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215615035 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; + this.broadcastChannels = new int[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); + broadcastChannels[i] = i; bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { - for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, channelSelector.selectChannels(record, numChannels)); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); - } + emitToTargetChannels(record, broadcastChannels); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); + serializer.serializeRecord(record); + + if (copyToTargetBuffers(rng.nextInt(numChannels))) { + serializer.prune(); + } } - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + private void emitToTargetChannels(T record, int[] targetChannels) throws IOException, InterruptedException { + serializer.serializeRecord(record); + + boolean pruneAfterCopying = false; + for (int channel : targetChannels) { + if (copyToTargetBuffers(channel)) { + pruneAfterCopying = true; + } + } - SerializationResult result = serializer.addRecord(record); + // Make sure we don't hold onto the large intermediate serialization buffer for too long + if (pruneAfterCopying) { + serializer.prune(); + } + } + /** +* Copies the intermediate serialization buffer to the BufferBuilder of the target channel, also +* checks to prune the intermediate buffer iif the target BufferBuilder is fulfilled and the record +* is full. +* +* @param targetChannel the target channel to get BufferBuilder +* @return true if the intermediate serialization buffer should be pruned +*/ + private boolean copyToTargetBuffers(int targetChannel) throws IOException, InterruptedException { Review comment: Third time I'm looking at this PR and third time I had to think for a minute what does the this method. I'm always forgetting that `serializer` is a class field and that this method copies from it. Maybe rename to `copyFromSerializerToTargetChannel`? Imo rename would allow us to drop most of the java doc and simplify it to just: ``` /** * @param targetChannel * @return true if the intermediate serialization buffer should be pruned */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611481 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { bufferBuilders[targetChannel] = Optional.empty(); - numBytesOut.inc(bufferBuilder.finish()); + numBytesOut.inc(bufferBuilderOpt.get().finish()); numBuffersOut.inc(); } } - + /** * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need * request a new one for this target channel. */ - @Nonnull private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel].isPresent()) { - return bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { + return bufferBuilderOpt.get(); } else { return requestNewBufferBuilder(targetChannel); } } - @Nonnull private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { checkState(!bufferBuilders[targetChannel].isPresent()); - BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking(); bufferBuilders[targetChannel] = Optional.of(bufferBuilder); targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel); return bufferBuilder; } private void closeBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - bufferBuilders[targetChannel].get().finish(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: ditto: rename or inline This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r215611435 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { * Marks the current {@link BufferBuilder} as finished and clears the state for next one. */ private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; + if (bufferBuilderOpt.isPresent()) { bufferBuilders[targetChannel] = Optional.empty(); - numBytesOut.inc(bufferBuilder.finish()); + numBytesOut.inc(bufferBuilderOpt.get().finish()); numBuffersOut.inc(); } } - + /** * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need * request a new one for this target channel. */ - @Nonnull private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel].isPresent()) { - return bufferBuilders[targetChannel].get(); + Optional bufferBuilderOpt = bufferBuilders[targetChannel]; Review comment: ditto: rename or inline This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r213949310 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSele this.numChannels = writer.getNumberOfSubpartitions(); - /* -* The runtime exposes a channel abstraction for the produced results -* (see {@link ChannelSelector}). Every channel has an independent -* serializer. -*/ - this.serializers = new SpanningRecordSerializer[numChannels]; + this.serializer = new SpanningRecordSerializer(); this.bufferBuilders = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { - serializers[i] = new SpanningRecordSerializer(); bufferBuilders[i] = Optional.empty(); } } public void emit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + // Make sure we don't hold onto the large intermediate serialization buffer for too long + serializer.prune(); } /** * This is used to broadcast Streaming Watermarks in-band with records. This ignores * the {@link ChannelSelector}. */ public void broadcastEmit(T record) throws IOException, InterruptedException { + serializer.serializeRecord(record); + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { - sendToTarget(record, targetChannel); + copyToTarget(targetChannel); } + + serializer.prune(); } /** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, rng.nextInt(numChannels)); - } + serializer.serializeRecord(record); - private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { - RecordSerializer serializer = serializers[targetChannel]; + copyToTarget(rng.nextInt(numChannels)); - SerializationResult result = serializer.addRecord(record); + serializer.prune(); + } + private void copyToTarget(int targetChannel) throws IOException, InterruptedException { + // We should reset the initial position of the intermediate serialization buffer before + // copying, so the serialization results can be copied to multiple target buffers. + serializer.reset(); + + BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); + SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); while (result.isFullBuffer()) { - if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } + tryFinishCurrentBufferBuilder(targetChannel); Review comment: When I was reviewing this part last time, I was only concerned that both `getBufferBuilder(targetChannel);` and `tryFinishCurrentBufferBuilder(targetChannel);` are accessing `bufferBuilders[targetChannel]` twice. However performance penalty (if any) shouldn't be important and I liked the new code more :) Regarding `finishCurrentBufferBuilder`. `tryFinishCurrentBufferBuilder` is currently used also in one more place (`broadcastEvent()`), so we would need to add there an `if` check preceding a call `finishCurrentBufferBuilder`. Either way, I don't mind, if you prefer one way over another feel free to change :) This is an automated message from the Apache Git Service. To respond to
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r210236329 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: So maybe keep it as it is. Maybe in the future someone will be struck by a better idea. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209921447 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: Re 1. I was thinking about something along those lines, however I saw the need for `close()`, because currently `SpanningRecordSerializer` reuses for multiple records `byte[]` that hides in `dataBuffer = serializationBuffer.wrapAsByteBuffer();` call. Is there some other way to release/return this array back to `RecordSerializer`? Or maybe that proves that `RecordSerializer` and `SerializedRecord` should be indeed in one class? Re 2. I'm not sure. This one seems like it would leak `RecordSerializer serializer` private fields to `SerializedRecord`. All in all, I'm not sure if the improvement here is worth the effort and even if we would improve/simplify code here (especially this `close()` bothers me). Also I agree that your changes in `addRecord`, `continueWritingWithNextBufferBuilder` and `copyToBufferBuilder` makes this code actually easier all in all, so I would be fine with merging it as is (modulo my other comments) :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `SpanningRecordSerializationTest#testSerializationRoundTrip(Iterable, int, RecordSerializer, RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290889 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I'm thinking about refactoring this class and splitting it into two: ``` class RecordSerializer { SerializedRecord serializeRecord(T record); }; class SerializedRecord implements Autoclosable { CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder); void close() { serializer.prune(); // and code to return state (serializationBuffer) to serializer for reuse } } ``` and usage: ``` public void randomEmit(T record) throws IOException, InterruptedException { try (SerializedRecord serializedRecord = serializer.serializeRecord(record)) { copyToTarget(serializedRecord, rng.nextInt(numChannels)); } } ``` somehow always was/is tickling my brain in current `RecordSerializer` is confusing to me and I have to always check it's implementation whenever I revisit the code. Maybe with this split it would be easier to understand? But I'm not sure about this. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290136 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: 1. do we need to cache `returnChannel`? Does it give any meaningful test execution speed up? 2. if so, instead of using `set` and `setNumber`, just check whether `returnChannel.length == numberOfOutputChannels`. If not, create new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209284512 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { /** * Marks the current {@link BufferBuilder} as finished and clears the state for next one. -* -* @return true if some data were written */ - private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer serializer) { - - if (!bufferBuilders[targetChannel].isPresent()) { - return false; + private void tryFinishCurrentBufferBuilder(int targetChannel) { + if (bufferBuilders[targetChannel].isPresent()) { + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + numBytesOut.inc(bufferBuilder.finish()); } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); - bufferBuilders[targetChannel] = Optional.empty(); + } - numBytesOut.inc(bufferBuilder.finish()); - serializer.clear(); - return true; + /** +* The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need +* request a new one for this target channel. +*/ + @Nonnull Review comment: Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming that any non `@Nullable` marked field is automatically `@Nonnull`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable, int, org.apache.flink.runtime.io.network.api.serialization.RecordSerializer, org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services