[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2019-01-28 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r251352161
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   Yes, from the long time we might do not need the `BroadcastPartitioner` 
implementation because we do not need any specific methods from this 
partitioner, then it can avoid the current `UnsupportedOperationException` ugly 
way.
   
   But it might refactor the implementation from DataStream path related to 
generate `BroadcastPartitioner`, and I would take the simple 
`ChannelSelector#isBroadcast` currently. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2019-01-23 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r250484212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   I am considering the way for solving this issue.
   
   I agree it seems better to introduce either a builder method or 
`RecordWriterBuilder` class for creating the specific RecordWriter. But it 
seems unfeasible to check ChannelSelector instance in current architecture, 
because the classes of `BatchTask` and `StreamTask` are in different modules 
and the` instanceof BroadcastPartitioner` check would only work under streaming 
module, not meanwhile work for `BatchTask` in runtime module.
   
   There might be two ways for this issue:
   1. Introduce `ChannelSelector#isBroadcast` method to solve the module 
dependence.
   2. Check the broadcast mode separately in `StreamTask` and `BatchTask` 
classes, that means no unified builder for creating `RecordWriter`
   
   I prefer the second way a bit in order not to change the interface much, 
although this check instance would be scattered in two different places. What 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2019-01-07 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r245596162
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   Yes, my previous thought is very similar with your above demo.
   
   The `BroadcastRecordWriter` can extend `RecordWriter` and optimize the emit 
path specially, then it can avoid `if` check for each emit.  And it might seem 
not very tricky to create `RecordWriter` or `BroadcastRecordWriter` based on 
`ChannelSelector` instance.
   
   I can try this way for updating the codes later. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] Refactor the ChannelSelector interface for single selected channel

2018-12-12 Thread GitBox
zhijiangW commented on a change in pull request #7199: [FLINK-10662][network] 
Refactor the ChannelSelector interface for single selected channel
URL: https://github.com/apache/flink/pull/7199#discussion_r241289363
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -68,20 +68,29 @@
 
private final boolean flushAlways;
 
+   private final boolean isBroadcastSelector;
+
private Counter numBytesOut = new SimpleCounter();
 
private Counter numBuffersOut = new SimpleCounter();
 
public RecordWriter(ResultPartitionWriter writer) {
-   this(writer, new RoundRobinChannelSelector());
+   this(writer, new RoundRobinChannelSelector(), false);
}
 
-   @SuppressWarnings("unchecked")
-   public RecordWriter(ResultPartitionWriter writer, ChannelSelector 
channelSelector) {
-   this(writer, channelSelector, false);
+   public RecordWriter(
 
 Review comment:
   To be honest, I am also a bit embarrassed for current implementation. It 
does exist the redundant and potential inconsistent issues as you mentioned.
   
   The first way you mentioned can solve above issues if we can check whether 
the selector is broadcast or not internally in `RecordWriter`.
   
   For the second way we can not only pass the boolean argument without 
selector, because once the boolean value is false, we should rely on the 
specific selector to get channels. So the selector has to be given explicitly, 
but the boolean can be got implicitly from the selector.
   
   I also thought of another way before, that is defining another 
BroadcastRecordWriter for handling directly. Then the writer is distinguished 
by common `RecordWriter` and special `BroadcastRecordWriter`. In common 
`RecordWriter`, the selector is needed for routing channels, and for 
`BroadcastRecordWriter` it does not need the selector at all. To do so we may 
need further extract the common codes for these two writers, furthermore we may 
need to remove current `StreamRecordWriter` which would be integrated with 
current `RecordWriter` directly. So the writer is divided by broadcast mode, 
not by stream mode. What do you think this way?
   
   I would also further consider the changes for above two options through.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services