[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217655518
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   This `checkState` was preventing from some bugs and data loses. Maybe 
replace with:
   ```
   checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].isFinished());
   ```
   ?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217655518
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -230,7 +227,6 @@ private BufferBuilder getBufferBuilder(int targetChannel) 
throws IOException, In
}
 
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   checkState(!bufferBuilders[targetChannel].isPresent());
 
 Review comment:
   This `checkState` was preventing from some bugs and data loses. Maybe 
replace with:
   `checkState(!bufferBuilders[targetChannel].isPresent() || 
bufferBuilders[targetChannel].isFinished());`?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-14 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r217654112
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -113,11 +113,7 @@ public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   serializer.serializeRecord(record);
-
-   if 
(copyFromSerializerToTargetChannel(rng.nextInt(numChannels))) {
-   serializer.prune();
-   }
+   emit(record, new int[] { rng.nextInt(numChannels) });
 
 Review comment:
   Wait a minute here. 
   
   ```
   emit(record, new int[] { rng.nextInt(numChannels) });
   ```
   
   @NicoK is an actual performance regression. Creating a single element int 
array (once per every record!) actually reduces the throughput by about 10%-20% 
(tested during one of the hackathons). Please revert to using 
`copyFromSerializerToTargetChannel(rng.nextInt(numChannels))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215613344
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, 
channelSelector.selectChannels(record, numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if (copyToTargetBuffers(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emitToTargetChannels(T record, int[] targetChannels) 
throws IOException, InterruptedException {
 
 Review comment:
   maybe rename just to `emit`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611369
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   Rename to `bufferBuilder` (marking type in variable name is not the best 
practice). 
   
   Have you seen throughput improvements by introducing this local variable? If 
not, maybe revert the change?
   
   FYI: When I was writing this code, I didn't see any performance improvement 
(and I was testing this exact change). Removing one extra CPU cache read 
(second `bufferBuilders[targetChannel]` access will either be optimised out or 
it will be a read from CPU caches/registries) usually hardly matters compared 
to taking locks :( 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215615035
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -89,77 +88,101 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
+   this.broadcastChannels = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
+   broadcastChannels[i] = i;
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
-   for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, 
channelSelector.selectChannels(record, numChannels));
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
-   for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
-   }
+   emitToTargetChannels(record, broadcastChannels);
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
+   serializer.serializeRecord(record);
+
+   if (copyToTargetBuffers(rng.nextInt(numChannels))) {
+   serializer.prune();
+   }
}
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   private void emitToTargetChannels(T record, int[] targetChannels) 
throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
+   boolean pruneAfterCopying = false;
+   for (int channel : targetChannels) {
+   if (copyToTargetBuffers(channel)) {
+   pruneAfterCopying = true;
+   }
+   }
 
-   SerializationResult result = serializer.addRecord(record);
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   if (pruneAfterCopying) {
+   serializer.prune();
+   }
+   }
 
+   /**
+* Copies the intermediate serialization buffer to the BufferBuilder of 
the target channel, also
+* checks to prune the intermediate buffer iif the target BufferBuilder 
is fulfilled and the record
+* is full.
+*
+* @param targetChannel the target channel to get BufferBuilder
+* @return true if the intermediate serialization buffer 
should be pruned
+*/
+   private boolean copyToTargetBuffers(int targetChannel) throws 
IOException, InterruptedException {
 
 Review comment:
   Third time I'm looking at this PR and third time I had to think for a minute 
what does the this method. I'm always forgetting that `serializer` is a class 
field and that this method copies from it.
   
   Maybe rename to `copyFromSerializerToTargetChannel`? Imo rename would allow 
us to drop most of the java doc and simplify it to just:
```
/**
 * @param targetChannel
 * @return true if the intermediate serialization buffer 
should be pruned
 */
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611481
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
bufferBuilders[targetChannel] = Optional.empty();
-   numBytesOut.inc(bufferBuilder.finish());
+   numBytesOut.inc(bufferBuilderOpt.get().finish());
numBuffersOut.inc();
}
}
-   
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   @Nonnull
private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
+   return bufferBuilderOpt.get();
} else {
return requestNewBufferBuilder(targetChannel);
}
}
 
-   @Nonnull
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
checkState(!bufferBuilders[targetChannel].isPresent());
-
BufferBuilder bufferBuilder = 
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
bufferBuilders[targetChannel] = Optional.of(bufferBuilder);

targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
return bufferBuilder;
}
 
private void closeBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   bufferBuilders[targetChannel].get().finish();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   ditto: rename or inline


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-06 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r215611435
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -197,40 +214,39 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
 */
private void tryFinishCurrentBufferBuilder(int targetChannel) {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
+   if (bufferBuilderOpt.isPresent()) {
bufferBuilders[targetChannel] = Optional.empty();
-   numBytesOut.inc(bufferBuilder.finish());
+   numBytesOut.inc(bufferBuilderOpt.get().finish());
numBuffersOut.inc();
}
}
-   
+
/**
 * The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
 * request a new one for this target channel.
 */
-   @Nonnull
private BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
-   if (bufferBuilders[targetChannel].isPresent()) {
-   return bufferBuilders[targetChannel].get();
+   Optional bufferBuilderOpt = 
bufferBuilders[targetChannel];
 
 Review comment:
   ditto: rename or inline


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-30 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r213949310
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -87,62 +86,71 @@ public RecordWriter(ResultPartitionWriter writer, 
ChannelSelector channelSele
 
this.numChannels = writer.getNumberOfSubpartitions();
 
-   /*
-* The runtime exposes a channel abstraction for the produced 
results
-* (see {@link ChannelSelector}). Every channel has an 
independent
-* serializer.
-*/
-   this.serializers = new SpanningRecordSerializer[numChannels];
+   this.serializer = new SpanningRecordSerializer();
this.bufferBuilders = new Optional[numChannels];
for (int i = 0; i < numChannels; i++) {
-   serializers[i] = new SpanningRecordSerializer();
bufferBuilders[i] = Optional.empty();
}
}
 
public void emit(T record) throws IOException, InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel : channelSelector.selectChannels(record, 
numChannels)) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   // Make sure we don't hold onto the large intermediate 
serialization buffer for too long
+   serializer.prune();
}
 
/**
 * This is used to broadcast Streaming Watermarks in-band with records. 
This ignores
 * the {@link ChannelSelector}.
 */
public void broadcastEmit(T record) throws IOException, 
InterruptedException {
+   serializer.serializeRecord(record);
+
for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
-   sendToTarget(record, targetChannel);
+   copyToTarget(targetChannel);
}
+
+   serializer.prune();
}
 
/**
 * This is used to send LatencyMarks to a random target channel.
 */
public void randomEmit(T record) throws IOException, 
InterruptedException {
-   sendToTarget(record, rng.nextInt(numChannels));
-   }
+   serializer.serializeRecord(record);
 
-   private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
-   RecordSerializer serializer = serializers[targetChannel];
+   copyToTarget(rng.nextInt(numChannels));
 
-   SerializationResult result = serializer.addRecord(record);
+   serializer.prune();
+   }
 
+   private void copyToTarget(int targetChannel) throws IOException, 
InterruptedException {
+   // We should reset the initial position of the intermediate 
serialization buffer before
+   // copying, so the serialization results can be copied to 
multiple target buffers.
+   serializer.reset();
+
+   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
+   SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
-   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
-   // If this was a full record, we are done. Not 
breaking
-   // out of the loop at this point will lead to 
another
-   // buffer request before breaking out (that 
would not be
-   // a problem per se, but it can lead to stalls 
in the
-   // pipeline).
-   if (result.isFullRecord()) {
-   break;
-   }
+   tryFinishCurrentBufferBuilder(targetChannel);
 
 Review comment:
   When I was reviewing this part last time, I was only concerned that both 
`getBufferBuilder(targetChannel);` and 
`tryFinishCurrentBufferBuilder(targetChannel);` are accessing 
`bufferBuilders[targetChannel]` twice. However performance penalty (if any) 
shouldn't be important and I liked the new code more :)
   
   Regarding `finishCurrentBufferBuilder`. `tryFinishCurrentBufferBuilder` is 
currently used also in one more place (`broadcastEvent()`), so we would need to 
add there an `if` check preceding a call `finishCurrentBufferBuilder`. Either 
way, I don't mind, if you prefer one way over another feel free to change :)


This is an automated message from the Apache Git Service.
To respond to 

[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r210236329
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   So maybe keep it as it is. Maybe in the future someone will be struck by a 
better idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-14 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209921447
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   Re 1.
   
   I was thinking about something along those lines, however I saw the need for 
`close()`, because currently `SpanningRecordSerializer` reuses for multiple 
records `byte[]` that hides in `dataBuffer = 
serializationBuffer.wrapAsByteBuffer();` call. Is there some other way to 
release/return this array back to `RecordSerializer`? Or maybe that proves that 
`RecordSerializer` and `SerializedRecord` should be indeed in one class?
   
   Re 2. 
   
   I'm not sure. This one seems like it would leak `RecordSerializer 
serializer` private fields to `SerializedRecord`.
   
   All in all, I'm not sure if the improvement here is worth the effort and 
even if we would improve/simplify code here (especially this `close()` bothers 
me). Also I agree that your changes in `addRecord`, 
`continueWritingWithNextBufferBuilder` and `copyToBufferBuilder` makes this 
code actually easier all in all, so I would be fine with merging it as is 
(modulo my other comments) :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-14 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`SpanningRecordSerializationTest#testSerializationRoundTrip(Iterable,
 int, RecordSerializer, 
RecordDeserializer)`? They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290889
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I'm thinking about refactoring this class and splitting it into two:
   ```
   class RecordSerializer {
SerializedRecord serializeRecord(T record);
   };
   
   class SerializedRecord implements Autoclosable {
 CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder);
   
 void close() {
serializer.prune();
// and code to return state (serializationBuffer) to serializer for 
reuse
 }
   }
   ```
   
   and usage:
   ```
public void randomEmit(T record) throws IOException, 
InterruptedException {
try (SerializedRecord serializedRecord = 
serializer.serializeRecord(record)) {
copyToTarget(serializedRecord, 
rng.nextInt(numChannels));
}
}
   ```
   
   somehow always was/is tickling my brain in current `RecordSerializer` is 
confusing to me and I have to always check it's implementation whenever I 
revisit the code. Maybe with this split it would be easier to understand? But 
I'm not sure about this. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290136
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   1. do we need to cache `returnChannel`? Does it give any meaningful test 
execution speed up?
   2. if so, instead of using `set` and `setNumber`, just check whether 
`returnChannel.length == numberOfOutputChannels`. If not, create new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209284512
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 
/**
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*
-* @return true if some data were written
 */
-   private boolean tryFinishCurrentBufferBuilder(int targetChannel, 
RecordSerializer serializer) {
-
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return false;
+   private void tryFinishCurrentBufferBuilder(int targetChannel) {
+   if (bufferBuilders[targetChannel].isPresent()) {
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+   numBytesOut.inc(bufferBuilder.finish());
}
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   }
 
-   numBytesOut.inc(bufferBuilder.finish());
-   serializer.clear();
-   return true;
+   /**
+* The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
+* request a new one for this target channel.
+*/
+   @Nonnull
 
 Review comment:
   Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming 
that any non `@Nullable` marked field is automatically `@Nonnull`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable,
 int, 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer,
 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`?
 They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services