[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r251352161 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: Yes, from the long time we might do not need the `BroadcastPartitioner` implementation because we do not need any specific methods from this partitioner, then it can avoid the current `UnsupportedOperationException` ugly way. But it might refactor the implementation from DataStream path related to generate `BroadcastPartitioner`, and I would take the simple `ChannelSelector#isBroadcast` currently. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r250484212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: I am considering the way for solving this issue. I agree it seems better to introduce either a builder method or `RecordWriterBuilder` class for creating the specific RecordWriter. But it seems unfeasible to check ChannelSelector instance in current architecture, because the classes of `BatchTask` and `StreamTask` are in different modules and the` instanceof BroadcastPartitioner` check would only work under streaming module, not meanwhile work for `BatchTask` in runtime module. There might be two ways for this issue: 1. Introduce `ChannelSelector#isBroadcast` method to solve the module dependence. 2. Check the broadcast mode separately in `StreamTask` and `BatchTask` classes, that means no unified builder for creating `RecordWriter` I prefer the second way a bit in order not to change the interface much, although this check instance would be scattered in two different places. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r245596162 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: Yes, my previous thought is very similar with your above demo. The `BroadcastRecordWriter` can extend `RecordWriter` and optimize the emit path specially, then it can avoid `if` check for each emit. And it might seem not very tricky to create `RecordWriter` or `BroadcastRecordWriter` based on `ChannelSelector` instance. I can try this way for updating the codes later. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel URL: https://github.com/apache/flink/pull/7199#discussion_r241289363 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -68,20 +68,29 @@ private final boolean flushAlways; + private final boolean isBroadcastSelector; + private Counter numBytesOut = new SimpleCounter(); private Counter numBuffersOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { - this(writer, new RoundRobinChannelSelector()); + this(writer, new RoundRobinChannelSelector(), false); } - @SuppressWarnings("unchecked") - public RecordWriter(ResultPartitionWriter writer, ChannelSelector channelSelector) { - this(writer, channelSelector, false); + public RecordWriter( Review comment: To be honest, I am also a bit embarrassed for current implementation. It does exist the redundant and potential inconsistent issues as you mentioned. The first way you mentioned can solve above issues if we can check whether the selector is broadcast or not internally in `RecordWriter`. For the second way we can not only pass the boolean argument without selector, because once the boolean value is false, we should rely on the specific selector to get channels. So the selector has to be given explicitly, but the boolean can be got implicitly from the selector. I also thought of another way before, that is defining another BroadcastRecordWriter for handling directly. Then the writer is distinguished by common `RecordWriter` and special `BroadcastRecordWriter`. In common `RecordWriter`, the selector is needed for routing channels, and for `BroadcastRecordWriter` it does not need the selector at all. To do so we may need further extract the common codes for these two writers, furthermore we may need to remove current `StreamRecordWriter` which would be integrated with current `RecordWriter` directly. So the writer is divided by broadcast mode, not by stream mode. What do you think this way? I would also further consider the changes for above two options through. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services