[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327063876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; +import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.testutils.serialization.types.Util; + +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link BroadcastRecordWriter}. + */ +public class BroadcastRecordWriterTest extends RecordWriterTest { + + public BroadcastRecordWriterTest() { + super(true); + } + + /** +* Tests the number of requested buffers and results are correct in the case of switching +* modes between {@link BroadcastRecordWriter#broadcastEmit(IOReadableWritable)} and +* {@link BroadcastRecordWriter#randomEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastMixedRandomEmitRecord() throws Exception { Review comment: I couldn't understand the test after trying 3 times. Could you please add some comments in between? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327062898 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -225,46 +178,56 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { numBuffersOut = metrics.getNumBuffersOutCounter(); } - /** -* Marks the current {@link BufferBuilder} as finished and clears the state for next one. -*/ - private void tryFinishCurrentBufferBuilder(int targetChannel) { - if (!bufferBuilders[targetChannel].isPresent()) { - return; - } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); - bufferBuilders[targetChannel] = Optional.empty(); + protected void finishBufferBuilder(BufferBuilder bufferBuilder) { numBytesOut.inc(bufferBuilder.finish()); numBuffersOut.inc(); } + /** +* This is used to send regular records. +*/ + public abstract void emit(T record) throws IOException, InterruptedException; + + /** +* This is used to send LatencyMarks to a random target channel. +*/ + public abstract void randomEmit(T record) throws IOException, InterruptedException; + + /** +* This is used to broadcast streaming Watermarks in-band with records. +*/ + public abstract void broadcastEmit(T record) throws IOException, InterruptedException; + /** * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need * request a new one for this target channel. */ - private BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel].isPresent()) { - return bufferBuilders[targetChannel].get(); - } else { - return requestNewBufferBuilder(targetChannel); - } - } + abstract BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException; - private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].get().isFinished()); + /** +* Requests a new {@link BufferBuilder} for the target channel and returns it. +*/ + abstract BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException; - BufferBuilder bufferBuilder = targetPartition.getBufferBuilder(); - bufferBuilders[targetChannel] = Optional.of(bufferBuilder); - targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel); - return bufferBuilder; - } + /** +* Marks the current {@link BufferBuilder} as finished if present and clears the state for next one. +*/ + abstract void tryFinishCurrentBufferBuilder(int targetChannel); - private void closeBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel].isPresent()) { - bufferBuilders[targetChannel].get().finish(); - bufferBuilders[targetChannel] = Optional.empty(); - } - } + /** +* Marks the current {@link BufferBuilder} as empty for the target channel. +*/ + abstract void emptyCurrentBufferBuilder(int targetChannel); + + /** +* Marks the current {@link BufferBuilder} as finished and empty for the target channel. Review comment: suggestion: Marks the current {@link BufferBuilder} as finished and releases the resources for the target channel. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327062516 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A regular record-oriented runtime result writer. + * + * The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of + * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)} + * operation is based on {@link ChannelSelector} to select the target channel. + * + * @param the type of the record that can be emitted with this record writer + */ +public class ChannelSelectorRecordWriter extends RecordWriter { Review comment: final for JIT optimization? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327062444 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ## @@ -18,30 +18,163 @@ package org.apache.flink.runtime.io.network.api.writer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; /** * A special record-oriented runtime result writer only for broadcast mode. * - * The BroadcastRecordWriter extends the {@link RecordWriter} and handles {@link #emit(IOReadableWritable)} - * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more efficient way. + * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder} + * for all the channels. Then the serialization results need be copied only once to this buffer which would be + * shared for all the channels in a more efficient way. * * @param the type of the record that can be emitted with this record writer */ public class BroadcastRecordWriter extends RecordWriter { Review comment: final for JIT optimization? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327062031 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -54,23 +54,17 @@ * * @param the type of the record that can be emitted with this record writer */ -public class RecordWriter { +public abstract class RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); - private final ResultPartitionWriter targetPartition; + protected final ResultPartitionWriter targetPartition; - private final ChannelSelector channelSelector; + protected final int numberOfChannels; Review comment: any reason to switch the order of fields here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327061719 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A regular record-oriented runtime result writer. + * + * The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of + * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)} + * operation is based on {@link ChannelSelector} to select the target channel. + * + * @param the type of the record that can be emitted with this record writer + */ +public class ChannelSelectorRecordWriter extends RecordWriter { + + private final ChannelSelector channelSelector; + + private final Optional[] bufferBuilders; Review comment: See above, no optional fields. Also mixing generics and arrays is not recommended. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode
AHeise commented on a change in pull request #7713: [FLINK-10995][runtime] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r327059906 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ## @@ -18,30 +18,163 @@ package org.apache.flink.runtime.io.network.api.writer; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; /** * A special record-oriented runtime result writer only for broadcast mode. * - * The BroadcastRecordWriter extends the {@link RecordWriter} and handles {@link #emit(IOReadableWritable)} - * operation via {@link #broadcastEmit(IOReadableWritable)} directly in a more efficient way. + * The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder} + * for all the channels. Then the serialization results need be copied only once to this buffer which would be + * shared for all the channels in a more efficient way. * * @param the type of the record that can be emitted with this record writer */ public class BroadcastRecordWriter extends RecordWriter { + /** The current buffer builder shared for all the channels. */ + private Optional bufferBuilder = Optional.empty(); Review comment: Optional fields are [discouraged](http://mail-archives.apache.org/mod_mbox/flink-dev/201908.mbox/%3CCAA_61Xo9oif8RjJgvxFNo%2Bua7_DoFksCy_5c_NOnQLWfS4-8qA%40mail.gmail.com%3E). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services