[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298973347
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298972543
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298972293
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298972053
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298971802
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298971129
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298971129
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298970097
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
 
 Review comment:
   All these constructors could be removed outside of `try`, and put above 
`closeableRegistry.registerCloseable` inside `try`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298969531
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
 
 Review comment:
   try (CloseableRegistry closeableRegistry = new CloseableRegistry()) to avoid 
finally clause.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298969332
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
 
 Review comment:
   buffersPerChannel  - > numberOfBufferPerChannel
   extraNetworkBuffersPerGate -> numberOfBuffersPerGate


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298948536
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -154,24 +157,40 @@ private static void registerOutputMetrics(
 
public static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+   isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
}
 
private static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
+
+   if (isCreditBased) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
 
 Review comment:
   I think I have issues only for the tests, and I would post then later today.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298919897
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -154,24 +157,40 @@ private static void registerOutputMetrics(
 
public static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+   isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
}
 
private static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
+
+   if (isCreditBased) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
 
 Review comment:
   nit: I think it is better to define a local var for 
`CreditBasedInputBuffersUsageGauge` like above `floatingBuffersUsageGauge` and 
`exclusiveBuffersUsageGauge`, because the broken line seems not good here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-27 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r298434626
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   sorry for bringing you trouble here. I also found other person cause the 
same issue for unrelated html changes. I would double check the root PR who 
should cause this change, and you might ignore this change in your PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297524179
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   I think it should not be that case. Maybe your metrics.md change would not 
effect any html files.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297511462
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
 
 Review comment:
   Tuple is not a good choice here unless it is necessary to do so.
   
   I think you only need to get one `RemoteInputChannel` for 
`requestSubpartition` and `onSenderBacklog` below. Another alternative way is 
you could construct `SingleInputGate` firstly, then construct relevant channel 
via `InputChannelBuilder#buildRemoteAndSetToGate(gate)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297509524
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates);
+
+   assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
+   assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
+   assertEquals(0.0, inputBufferPoolUsageGauge.getValue(), 
0.0);
+
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297508630
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
 
 Review comment:
   TBH, I think this test is too big for mixing many functions to be verified 
together. 
   
   My suggestion is breaking it into several simple ones to make sure each test 
is only for verifying one function, then it would be very easy for reviewing 
and maintaining. 
   
   E.g. if someone changes the logic of internal backlog future which might 
cause this test fail, if he wants to fix this test, he has to review the whole 
more than 100 lines for tracing the reason. If each test is short and only 
covering single function, then it would be very easy for other person, also you 
could find it easy to write single unit test. I could give some examples below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297497629
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297497664
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297497609
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
 
 Review comment:
   parameter formatting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297497075
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
 Review comment:
   you could remove final for these vars, otherwise all the vars should be 
final to keep consistent, like above int vars.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297496069
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates);
+
+   assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
+   assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
+   assertEquals(0.0, inputBufferPoolUsageGauge.getValue(), 
0.0);
+
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297496295
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates);
+
+   assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
+   assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
+   assertEquals(0.0, inputBufferPoolUsageGauge.getValue(), 
0.0);
+
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297496220
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates);
+
+   assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
+   assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
+   assertEquals(0.0, inputBufferPoolUsageGauge.getValue(), 
0.0);
+
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297496069
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   final Tuple3, 
List> tuple1 = buildInputGate(network,
+   numberOfRemoteChannelsGate1,
+   numberOfLocalChannelsGate1);
+   final Tuple3, 
List> tuple2 = buildInputGate(network,
+   numberOfRemoteChannelsGate2,
+   numberOfLocalChannelsGate2);
+
+   final SingleInputGate inputGate1 = tuple1.f0;
+   final SingleInputGate inputGate2 = tuple2.f0;
+
+   try {
+
+   assertEquals(tuple1.f1.size(), 
numberOfRemoteChannelsGate1 + numberOfLocalChannelsGate1);
+   RemoteInputChannel remoteInputChannel1 = 
tuple1.f1.get(0);
+   RemoteInputChannel remoteInputChannel2 = 
tuple1.f1.get(1);
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{tuple1.f0, tuple2.f0};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates);
+
+   assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
+   assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
+   assertEquals(0.0, inputBufferPoolUsageGauge.getValue(), 
0.0);
+
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297495056
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -154,24 +157,40 @@ private static void registerOutputMetrics(
 
public static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+   isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
}
 
private static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
+
+   if (isCreditBased) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
 
 Review comment:
   It is not good to make partial parameters in separate line. I think you 
should make `METRIC_INPUT_POOL_USAGE` as a separate line, and `new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge, 
exclusiveBuffersUsageGauge, inputGates)` as a separate line.
   Or you could define a local var for `CreditBasedInputBuffersUsageGauge` like 
above `floatingBuffersUsageGauge` and `exclusiveBuffersUsageGauge`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297494125
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -154,24 +157,40 @@ private static void registerOutputMetrics(
 
public static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
SingleInputGate[] inputGates) {
registerInputMetrics(
isDetailedMetrics,
+   isCreditBased,
inputGroup,
inputGroup.addGroup(METRIC_GROUP_BUFFERS),
inputGates);
}
 
private static void registerInputMetrics(
boolean isDetailedMetrics,
+   boolean isCreditBased,
MetricGroup inputGroup,
MetricGroup buffersGroup,
SingleInputGate[] inputGates) {
if (isDetailedMetrics) {
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
+
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
+
+   if (isCreditBased) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBuffersUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates));
+   } else {
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
+   }
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
 
 Review comment:
   should remove this, it was already done in above else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297493392
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
 ##
 @@ -66,6 +66,8 @@
 
private static final String METRIC_INPUT_QUEUE_LENGTH = 
"inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+   private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = 
"floatingBuffersUsage";
 
 Review comment:
   floatingBuffersUsage -> inputFloatingBuffersUsage, also add the `input` 
prefix for following `exclusiveBuffersUsage`, then it is easy to distinguish 
from the metric name and keep consistent with other.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491959
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
 
 Review comment:
   remove `pool`, `measuring the input buffers usage for `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   agree above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297491040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297490492
 
 

 ##
 File path: 
docs/_includes/generated/netty_shuffle_environment_configuration.html
 ##
 @@ -0,0 +1,61 @@
+
 
 Review comment:
   ditto: this file should not be changed as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-25 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r297490427
 
 

 ##
 File path: docs/_includes/generated/blob_server_configuration.html
 ##
 @@ -7,16 +7,16 @@
 
 
 
-
-blob.client.socket.timeout
-30
-The socket timeout in milliseconds for the blob client.
-
 
 blob.client.connect.timeout
 0
 The connection timeout in milliseconds for the blob 
client.
 
+
+blob.client.socket.timeout
 
 Review comment:
   Why this changed? I think it should not be changed, and you could double 
check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-24 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r296993927
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBuffersUsageGauge extends 
AbstractBuffersUsageGauge {
+
+   private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+   private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
+
+   public CreditBasedInputBuffersUsageGauge(
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge,
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+   SingleInputGate[] inputGates) {
+   super(checkNotNull(inputGates));
+   this.floatingBuffersUsageGauge = floatingBuffersUsageGauge;
 
 Review comment:
   checkNotNull for `floatingBuffersUsageGauge` and `exclusiveBuffersUsageGauge`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295127345
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   Yes, I only pointed out a different idea and have not thought through it 
yet. 
   The current way keeps the previous behavior and I could accept it atm if no 
better solution.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295118519
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 ##
 @@ -84,6 +87,9 @@
private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
private static final String METRIC_INPUT_QUEUE_LENGTH = 
"inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+   private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = 
"floatingBuffersUsage";
+   private static final String METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE = 
"exclusiveBuffersUsage";
+
 
 Review comment:
   it is not covered by checkstyle, but I think it should be common sense. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295115863
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295115472
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295115200
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295114415
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295114348
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295113109
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+   
inputGate1.setInputChannel(resultPartitionId1.getPartitionId(), 
remoteInputChannel1);
+
+   RemoteInputChannel remoteInputChannel2 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId2, 
1).toRemoteInputChannel(connectionId2);
+   
inputGate1.setInputChannel(resultPartitionId2.getPartitionId(), 
remoteInputChannel2);
+
+   RemoteInputChannel remoteInputChannel3 = 
createUnknownInputChannel(network, inputGate2, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
+ 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295112375
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   try {
+   final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+   final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+   final ConnectionID connectionId1 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+   final ConnectionID connectionId2 = new ConnectionID(new 
InetSocketAddress("localhost", 5000), 0);
+
+   RemoteInputChannel remoteInputChannel1 = 
createUnknownInputChannel(network, inputGate1, resultPartitionId1, 
0).toRemoteInputChannel(connectionId1);
 
 Review comment:
   We might use `new InputChannelBuilder().buildRemoteAndSetToGate(inputGate1)` 
instead, also for the following local channel.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295110430
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
+   
.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+   .setupBufferPoolFactory(network)
+   .build();
+
+   final SingleInputGate inputGate2 =  new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
 
 Review comment:
   ditto: `setIsCreditBased(true)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295109922
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
 
 Review comment:
   If we want to use final for some vars, it is better to keep the same format 
for others in consistency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295109987
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
+
+   final SingleInputGate inputGate1 = new SingleInputGateBuilder()
+   .setNumberOfChannels(2)
+   .setIsCreditBased(true)
 
 Review comment:
   remove `.setIsCreditBased(true)` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295109739
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
+
+   @Test
+   public void testBufferUsageMetrics() throws IOException, 
InterruptedException {
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder().setIsCreditBased(true).build();
 
 Review comment:
   `setIsCreditBased(true)` could be ignored because the default value in 
builder is already true.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295109278
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
 
 Review comment:
   `Tests the metrics for input buffers usage`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295109109
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for input buffer usage related metrics.
+ */
+public class InputBuffersMetricsTest {
 
 Review comment:
   extends `TestLogger` for new added test class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295108937
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
 ##
 @@ -18,38 +18,33 @@
 
 package org.apache.flink.runtime.io.network.metrics;
 
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
 /**
  * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s.
  */
-public class InputBufferPoolUsageGauge implements Gauge {
-
-   private final SingleInputGate[] inputGates;
+public class InputBufferPoolUsageGauge extends AbstractBuffersUsageGauge {
 
public InputBufferPoolUsageGauge(SingleInputGate[] inputGates) {
-   this.inputGates = inputGates;
+   super(inputGates);
 
 Review comment:
   `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295108731
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   I ever had a concern with the case of multiple input gates. Each 
`SingleInputGate` corresponds to a different upstream `JobVertex`. If we 
calculate the value for all the gates, it seems difficult to distinguish the 
specific usage for individual gate for performance tuning.
   
   Another option is we calculate the value separately for each gate, and take 
the maximum value as the final value if we agree the max usages should bring 
more concerns for performance tuning. I am not quite sure of it. 
WDYT?@pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295108731
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
+
+   AbstractBuffersUsageGauge(SingleInputGate[] inputGates) {
+   this.inputGates = inputGates;
+   }
+
+   @Override
+   public Float getValue() {
+   int usedBuffers = 0;
+   int bufferPoolSize = 0;
+
+   for (SingleInputGate inputGate : inputGates) {
 
 Review comment:
   I ever had a concern with the case of multiple input gates. Each 
`SingleInputGate` corresponds to a different upstream `JobVertex`. If we 
calculate the value for all the gates, it seems difficult to distinguish the 
specific usage for individual gate for performance tuning.
   
   Another option is we calculate the value separately for each gate, and take 
the maximum value as the final value if we agree the max usage should bring 
more concerns for performance tuning. I am not quite sure of it. 
WDYT?@pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295107199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
 
 Review comment:
   This class should be rename as `CreditBasedInputBuffersUsageGauge`, because 
the exclusive buffers have no pool and it would also be covered in this gauge. 
And the above javaDoc should be adjusted as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295107501
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 ##
 @@ -303,7 +309,18 @@ private void registerInputMetrics(MetricGroup inputGroup, 
MetricGroup buffersGro
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
-   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
+
+   if (config.isCreditBased()) {
 
 Review comment:
   Note: in another PR #8485 the metrics operations are refactored in a metric 
factory. If that PR merged firstly then we might rebase to adjust this part.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295107199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
 
 Review comment:
   This class should be rename as `CreditBasedInputBuffersUsageGauge`, because 
the exclusive buffers have no pool and it would also be covered in this gauge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295106860
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
 
 Review comment:
   final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295106771
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
+
+   private FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+   private ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
 
 Review comment:
   ditto: final


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295106744
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
+
+   private FloatingBuffersUsageGauge floatingBuffersUsageGauge;
 
 Review comment:
   final 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295106517
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
+
+   private FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+   private ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
+
+   public CreditBasedInputBufferPoolUsageGauge(FloatingBuffersUsageGauge 
floatingBuffersUsageGauge,
 
 Review comment:
   parameters formatting : should be separate line with indentation for every 
parameter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295106226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 ##
 @@ -84,6 +87,9 @@
private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
private static final String METRIC_INPUT_QUEUE_LENGTH = 
"inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+   private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = 
"floatingBuffersUsage";
 
 Review comment:
   might add `input` prefix as `inputFloatingBuffersUsage` and 
`inputExclusiveBuffersUsage`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295105319
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the exclusive buffers usage gauge for {@link 
SingleInputGate}s.
+ */
+public class ExclusiveBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+   public ExclusiveBuffersUsageGauge(SingleInputGate[] inputGates) {
+   super(inputGates);
+   }
+
+   @Override
+   public int calculateUsedBuffers(SingleInputGate inputGate) {
+   int usedBuffers = 0;
+   for (InputChannel ic : inputGate.getInputChannels().values()) {
+   if (ic instanceof RemoteInputChannel) {
+   usedBuffers += ((RemoteInputChannel) 
ic).unsynchronizedGetExclusiveBuffersUsed();
+   }
+   }
+   return usedBuffers;
+   }
+
+   @Override
+   public int calculateBufferPoolSize(SingleInputGate inputGate) {
+   int requestedExclusiveBuffers = 0;
 
 Review comment:
   maybe rename as totalExclusiveBuffers


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295104613
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
+ */
+public abstract class AbstractBuffersUsageGauge implements Gauge {
+
+   protected SingleInputGate[] inputGates;
+
+   @VisibleForTesting
+   public abstract int calculateUsedBuffers(SingleInputGate inputGate);
+
+   @VisibleForTesting
+   public abstract int calculateBufferPoolSize(SingleInputGate inputGate);
 
 Review comment:
   how about renaming `calculateBufferPoolSize` as `calculateTotalBuffers`, 
because for exclusive case it has not buffer pool, so this naming seems not 
general.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295103836
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/AbstractBuffersUsageGauge.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Abstract gauge implementation for calculate the buffer usage percent.
 
 Review comment:
   for calculating the buffers


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295103038
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the floating buffers usage gauge for {@link 
SingleInputGate}s.
+ * real used buffers which are occupied by intermediate data / localBufferPool 
size
 
 Review comment:
   `real used buffers which are occupied by intermediate data / localBufferPool 
size` seems not very proper, might be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295102591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/FloatingBuffersUsageGauge.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the floating buffers usage gauge for {@link 
SingleInputGate}s.
+ * real used buffers which are occupied by intermediate data / localBufferPool 
size
+ */
+public class FloatingBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+   public FloatingBuffersUsageGauge(SingleInputGate[] inputGates) {
+   super(inputGates);
 
 Review comment:
   ditto: `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295102507
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBufferPoolUsageGauge.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the input buffer pool usage gauge for {@link 
SingleInputGate}s under credit based mode.
+ */
+public class CreditBasedInputBufferPoolUsageGauge extends 
AbstractBuffersUsageGauge {
+
+   private FloatingBuffersUsageGauge floatingBuffersUsageGauge;
+   private ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
+
+   public CreditBasedInputBufferPoolUsageGauge(FloatingBuffersUsageGauge 
floatingBuffersUsageGauge,
+   
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
+   
SingleInputGate[] inputGates) {
+   super(inputGates);
 
 Review comment:
`checkNotNull` for the parameters


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295102555
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ExclusiveBuffersUsageGauge.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+/**
+ * Gauge metric measuring the exclusive buffers usage gauge for {@link 
SingleInputGate}s.
+ */
+public class ExclusiveBuffersUsageGauge extends AbstractBuffersUsageGauge {
+
+   public ExclusiveBuffersUsageGauge(SingleInputGate[] inputGates) {
+   super(inputGates);
 
 Review comment:
   ditto: `checkNotNull(inputGates)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295101589
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 ##
 @@ -303,7 +309,18 @@ private void registerInputMetrics(MetricGroup inputGroup, 
MetricGroup buffersGro
InputGateMetrics.registerQueueLengthMetrics(inputGroup, 
inputGates);
}
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new 
InputBuffersGauge(inputGates));
-   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
InputBufferPoolUsageGauge(inputGates));
+
+   if (config.isCreditBased()) {
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+
+   
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, 
exclusiveBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, 
floatingBuffersUsageGauge);
+   buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new 
CreditBasedInputBufferPoolUsageGauge(floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge, inputGates));
 
 Review comment:
   If the parameters are broken into separate lines, make every parameter in a 
separate line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-18 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r295100656
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 ##
 @@ -84,6 +87,9 @@
private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
private static final String METRIC_INPUT_QUEUE_LENGTH = 
"inputQueueLength";
private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+   private static final String METRIC_INPUT_FLOATING_BUFFERS_USAGE = 
"floatingBuffersUsage";
+   private static final String METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE = 
"exclusiveBuffersUsage";
+
 
 Review comment:
   remove this extra empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services