[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628405#comment-15628405
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2727


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628332#comment-15628332
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2727
  
will merge this later on


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15624170#comment-15624170
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zhuhaifengleon commented on a diff in the pull request:

https://github.com/apache/flink/pull/2727#discussion_r85869851
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 ---
@@ -80,18 +76,25 @@ public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
 
-   public MetricGroup getBuffersGroup() {
-   return buffers;
-   }
-
// 

//  metrics of Buffers group
// 

 
/**
+* initialize Buffer Metrics for a task
+*/
+   public void initializeBufferMetrics(Task task) {
+   final MetricGroup buffers = addGroup("buffers");
+   buffers.gauge("inputQueueLength", new InputBuffersGauge(task));
+   buffers.gauge("outputQueueLength", new 
OutputBuffersGauge(task));
+   buffers.gauge("inPoolUsage", new 
InputBufferPoolUsageGauge(task));
+   buffers.gauge("outPoolUsage", new 
OutputBufferPoolUsageGauge(task));
+   }
+
+   /**
 * Input received buffers gauge of a task
 */
-   public static final class InputBuffersGauge implements Gauge {
+   private final class InputBuffersGauge implements Gauge {
--- End diff --

InputBuffersGauge is unaccessible for any class except TaskIOMetricGroup, 
so I change the public to private. It can work no matter whether it is static 
or not. to save outclass reference, static is prefer.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15622283#comment-15622283
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2727#discussion_r85746750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 ---
@@ -80,18 +76,25 @@ public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
 
-   public MetricGroup getBuffersGroup() {
-   return buffers;
-   }
-
// 

//  metrics of Buffers group
// 

 
/**
+* initialize Buffer Metrics for a task
+*/
+   public void initializeBufferMetrics(Task task) {
+   final MetricGroup buffers = addGroup("buffers");
+   buffers.gauge("inputQueueLength", new InputBuffersGauge(task));
+   buffers.gauge("outputQueueLength", new 
OutputBuffersGauge(task));
+   buffers.gauge("inPoolUsage", new 
InputBufferPoolUsageGauge(task));
+   buffers.gauge("outPoolUsage", new 
OutputBufferPoolUsageGauge(task));
+   }
+
+   /**
 * Input received buffers gauge of a task
 */
-   public static final class InputBuffersGauge implements Gauge {
+   private final class InputBuffersGauge implements Gauge {
--- End diff --

why did you remove the `static` keyword?


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15621599#comment-15621599
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2727#discussion_r85698371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -111,6 +111,11 @@ public int getNumBuffers() {
}
 
@Override
+   public int getNumberOfUsedBuffers() {
+   return numberOfRequestedMemorySegments - 
getNumberOfAvailableMemorySegments();
--- End diff --

instead of `getNumberOfAvailableMemorySegments()` we could use 
`availableMemorySegments.size()` so we never enter a synchronized block and 
affect the LocalBufferPool.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15620210#comment-15620210
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2693


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614058#comment-15614058
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zhuhaifengleon commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85463825
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
 
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+   // add metrics for buffers
+   
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --

Stephan, have you merged this already? I will do that in a patch after 
discussion with Chesnay if you had merged it. 


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611676#comment-15611676
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85322207
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
 
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+   // add metrics for buffers
+   
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --

I have rebased merged this already.
How about doing that in a followup patch?


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611401#comment-15611401
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85304524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -347,6 +348,12 @@ public Task(
 
// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
+
+   // add metrics for buffers
+   
this.metrics.getIOMetricGroup().getBuffersGroup().gauge("inputQueueLength", new 
TaskIOMetricGroup.InputBuffersGauge(this));
--- End diff --

we could have a static iniitializer method in the TaskIOMetricGroup for 
this.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611358#comment-15611358
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2693
  
This looks good, thanks!
Merging this...


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.inputQueueLength: received buffers of InputGates for a task
>  2. Buffers.outputQueueLength: buffers of produced ResultPartitions for a task
>  3. Buffers.inPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.outPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15611068#comment-15611068
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user zhuhaifengleon commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85282572
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1264,4 +1271,76 @@ public void run() {
}
}
}
+
+   // 

+   //  metrics
+   // 

+
+   private class InputBuffersGauge implements Gauge {
--- End diff --

I had pull these classes out into TaskIOMetricGroup since the buffers 
metrics is Task I/O scope


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607974#comment-15607974
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2693
  
Good addition, with some request for changes.

I would suggest some name changes for the Gauges:
  - "numIn" --> "inputQueueLength"
  - "numOut" --> "outputQueueLength"
  - "InPoolUsage" --> "inPoolUsage"
  -  "outPoolUsage" --> "outPoolUsage"


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607948#comment-15607948
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85079869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -227,4 +227,10 @@ public String toString() {
getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
spillWriter != null);
}
+
+   public int getNumberOfQueuedBuffers() {
+   synchronized (buffers) {
--- End diff --

Same here, try to avoid synchronization.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607946#comment-15607946
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85080328
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1264,4 +1271,76 @@ public void run() {
}
}
}
+
+   // 

+   //  metrics
+   // 

+
+   private class InputBuffersGauge implements Gauge {
--- End diff --

Can you pull these classes out into a separate file? The `Task` class is 
very large already.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607950#comment-15607950
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85080051
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -222,6 +226,20 @@ public int getPageSize() {
}
}
 
+   public int getNumberOfQueuedBuffers() {
+   int totalBuffers = 0;
+
+   for (Map.Entry 
entry: inputChannels.entrySet()) {
--- End diff --

I think this map may change asynchronously, so it is probably better to 
catch exceptions here and re-try for some times. It it fails three times, 
return `-1` for "unknown"


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607949#comment-15607949
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85079795
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -220,4 +220,10 @@ boolean registerListener(NotificationListener 
listener) {
throw new IllegalStateException("Already registered 
listener.");
}
}
+
+   public int getNumberOfQueuedBuffers() {
+   synchronized (buffers) {
--- End diff --

Can we avoid synchronization here? The metrics should never influence 
(block) the other code.
I would rather have the metrics be one off once in a while and avoid the 
lock here.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607947#comment-15607947
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2693#discussion_r85079818
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -228,6 +232,23 @@ public long getTotalNumberOfBytes() {
return totalNumberOfBytes;
}
 
+   public int getNumberOfQueuedBuffers() {
+   int totalBuffers = 0;
+
+   for (ResultSubpartition subpartition : subpartitions) {
+
+   if (subpartition instanceof PipelinedSubpartition) {
--- End diff --

If you add the `getNumberOfQueuedBuffers()` method to `ResultSubpartition` 
then you do not need to check and cast here.


> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4923) Expose input/output buffers and bufferPool usage as a metric for a Task

2016-10-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15607919#comment-15607919
 ] 

ASF GitHub Bot commented on FLINK-4923:
---

GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2693

[FLINK-4923] [Metrics] Expose input/output buffers and bufferPool usa…

The buffers and buffer usage of Input/output bufferPool for a task reflect 
wether a task congestion.
So we expose the following Metrics as a Buffers MetricGroup on the 
TaskIOMetricGroup, all these metrics is a gauge.
1. numIn of Buffers: received buffers of all InputGates of a task
2. numOut of Buffers: buffers of all produced ResultPartitions of a task
3. InPoolUsage of Buffers: bufferPool usage of all InputGates of a task
4. OutPoolUsage of Buffers: bufferPool usage of all produced 
ResultPartitions of a task

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-4923

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2693


commit 3beb65b8b3ee13d9901763c0fc2138613a36d154
Author: zhuhaifengleon 
Date:   2016-10-26T07:48:31Z

[FLINK-4923] [Metrics] Expose input/output buffers and bufferPool usage as 
a metric for a Task




> Expose input/output buffers and bufferPool usage as a metric for a Task
> ---
>
> Key: FLINK-4923
> URL: https://issues.apache.org/jira/browse/FLINK-4923
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
>  1. Buffers.numIn: received buffers of InputGates for a task
>  2. Buffers.numOut: buffers of produced ResultPartitions for a task
>  3. Buffers.InPoolUsage: usage of InputGates buffer pool for a task
>  4. Buffers.OutPoolUsage: usage of produced ResultPartitions buffer pool for 
> a task



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)