[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-08-02 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r310306973
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,49 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+
+   final RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(1)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new TestingConnectionManager())
+   .buildRemoteAndSetToGate(inputGate);
+
+   InputChannelBuilder.newBuilder()
+   .setChannelIndex(0)
+   .setPartitionId(localResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new TestingConnectionManager())
+   .buildLocalAndSetToGate(inputGate);
+
+   try {
+   localResultPartition.setup();
+   inputGate.setup();
+
+   
remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+   assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+   
localResultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledBufferConsumer(1),
 0);
+   assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+   } finally {
+   localResultPartition.release();
+   inputGate.close();
+   network.close();
+   }
+
 
 Review comment:
   nit: remove this empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-08-02 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r310307040
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,49 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
 
 Review comment:
   localResultPartition -> resultPartition


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304776778
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
+   LocalInputChannel localInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(0)
+   .setPartitionId(localResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildLocalAndSetToGate(inputGate);
+
+   final ResultPartitionID remoteResultPartitionId = 
remoteResultPartition.getPartitionId();
+   RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
 
 Review comment:
   also mark it final for consistency


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304776447
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
+   LocalInputChannel localInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(0)
+   .setPartitionId(localResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildLocalAndSetToGate(inputGate);
+
+   final ResultPartitionID remoteResultPartitionId = 
remoteResultPartition.getPartitionId();
+   RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(1)
+   .setPartitionId(remoteResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildRemoteAndSetToGate(inputGate);
+
+   inputGate.setup();
+
+   
remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+   assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+   
localResultPartition.addBufferConsumer(BufferBuilderTestUtils.createBufferBuilder(1).createBufferConsumer(),
 0);
+   assertEquals(2, inputGate.getNumberOfQueuedBuffers());
+   } finally {
+   inputGate.close();
 
 Review comment:
   also release partition via `resultPartition.release()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304776032
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
+   LocalInputChannel localInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(0)
+   .setPartitionId(localResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildLocalAndSetToGate(inputGate);
+
+   final ResultPartitionID remoteResultPartitionId = 
remoteResultPartition.getPartitionId();
+   RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(1)
+   .setPartitionId(remoteResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildRemoteAndSetToGate(inputGate);
+
+   inputGate.setup();
+
+   
remoteInputChannel.onBuffer(TestBufferFactory.createBuffer(1), 0, 0);
+   assertEquals(1, inputGate.getNumberOfQueuedBuffers());
+
+   
localResultPartition.addBufferConsumer(BufferBuilderTestUtils.createBufferBuilder(1).createBufferConsumer(),
 0);
 
 Review comment:
   Might use `BufferBuilderTestUtils .createFilledBufferConsumer(1)` directly 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304775304
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
 
 Review comment:
   move this setup in `try` clause together with below `inputGate.setup()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304775096
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
+   LocalInputChannel localInputChannel = 
InputChannelBuilder.newBuilder()
+   .setChannelIndex(0)
+   .setPartitionId(localResultPartitionId)
+   .setupFromNettyShuffleEnvironment(network)
+   .setConnectionManager(new 
TestingConnectionManager())
+   .buildLocalAndSetToGate(inputGate);
+
+   final ResultPartitionID remoteResultPartitionId = 
remoteResultPartition.getPartitionId();
+   RemoteInputChannel remoteInputChannel = 
InputChannelBuilder.newBuilder()
 
 Review comment:
   Move construct `remoteInputChannel` and above `localInputChannel` outside of 
`try` clause. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304773452
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
+   LocalInputChannel localInputChannel = 
InputChannelBuilder.newBuilder()
 
 Review comment:
   no need to define a local var for `localInputChannel`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304772727
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
 
 Review comment:
   no need to create `remoteResultPartition`, we can delete it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304772107
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
 
 Review comment:
   no need to create `localResultPartition`, we can delete it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304772107
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
 
 Review comment:
   no need to create `localResultPartition`, we can delete it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304769937
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -559,6 +561,57 @@ public void testUpdateUnknownInputChannel() throws 
Exception {
}
}
 
+   @Test
+   public void testQueuedBuffers() throws Exception {
+   final NettyShuffleEnvironment network = 
createNettyShuffleEnvironment();
+
+   final ResultPartition localResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   final ResultPartition remoteResultPartition = new 
ResultPartitionBuilder()
+   
.setResultPartitionManager(network.getResultPartitionManager())
+   
.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+   .build();
+
+   localResultPartition.setup();
+   remoteResultPartition.setup();
+
+   final SingleInputGate inputGate = createInputGate(network, 2, 
ResultPartitionType.PIPELINED);
+
+   try {
+   final ResultPartitionID localResultPartitionId = 
localResultPartition.getPartitionId();
+   addUnknownInputChannel(network, inputGate, 
localResultPartitionId, 0);
 
 Review comment:
   no need to add unknown channel


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304767046
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -558,5 +558,9 @@ public Void call() throws Exception {
 
return null;
}
+
+   public SingleInputGate getInputGate() {
 
 Review comment:
   It is dead code which should be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304766052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 ##
 @@ -260,6 +260,17 @@ void releaseAllResources() throws IOException {
}
}
 
+   @Override
+   public int unsynchronizedGetNumberOfQueuedBuffers() {
+   ResultSubpartitionView view = subpartitionView;
+
+   if (view != null) {
+   return 
subpartitionView.unsynchronizedGetNumberOfQueuedBuffers();
 
 Review comment:
   we should call `view.unsynchronizedGetNumberOfQueuedBuffers` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304759017
 
 

 ##
 File path: docs/monitoring/metrics.md
 ##
 @@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   `ignores` -> `Consider`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304757301
 
 

 ##
 File path: docs/monitoring/metrics.md
 ##
 @@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   `using the queue of subpartitions`? `blocking` is not very accurate here 
because local channel might correspond to both pipelined and blocking 
subpartitions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304757393
 
 

 ##
 File path: docs/monitoring/metrics.zh.md
 ##
 @@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   ditto: `blocking` and `ignores` concern


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304757393
 
 

 ##
 File path: docs/monitoring/metrics.zh.md
 ##
 @@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   ditto: `blocking` and `ignores` concern


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304759017
 
 

 ##
 File path: docs/monitoring/metrics.md
 ##
 @@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   `ignores` -> `Consider`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304757301
 
 

 ##
 File path: docs/monitoring/metrics.md
 ##
 @@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   `using the queue of subpartitions`? `blocking` is not very accurate here 
because local channel might correspond to both pipelined and blocking 
subpartitions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, Metrics]Take localInputChannel into account when compute inputQueueLength

2019-07-18 Thread GitBox
zhijiangW commented on a change in pull request #8559: [FLINK-12576][Network, 
Metrics]Take localInputChannel into account when compute inputQueueLength
URL: https://github.com/apache/flink/pull/8559#discussion_r304757393
 
 

 ##
 File path: docs/monitoring/metrics.zh.md
 ##
 @@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
 
 Review comment:
   ditto: `blocking` concern


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services