[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470265#comment-16470265
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4893


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
> Fix For: 1.5.0
>
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365341#comment-16365341
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4893
  
@zjureel Can you close this PR? Your changes were merged.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
> Fix For: 1.5.0
>
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354127#comment-16354127
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5397


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353803#comment-16353803
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5397
  
Changes look good to me. Thanks for your contribution @GJL. Merging this PR.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353620#comment-16353620
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166229314
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/VertexBackPressureStatusTest.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JobVertexBackPressureInfo.VertexBackPressureStatus}.
+ */
+public class VertexBackPressureStatusTest {
--- End diff --

Same here.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353619#comment-16353619
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166226318
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 ---
@@ -137,11 +151,28 @@ public static JobManagerServices fromConfiguration(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("jobmanager-future"));
 
+   final StackTraceSampleCoordinator stackTraceSampleCoordinator =
+   new StackTraceSampleCoordinator(futureExecutor, 
timeout.toMillis());
+   final BackPressureStatsTracker backPressureStatsTracker = new 
BackPressureStatsTracker(
+   stackTraceSampleCoordinator,
+   
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
+   
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+
+   futureExecutor.scheduleWithFixedDelay(
--- End diff --

Not sure whether we should schedule the cleanup here because that way we 
don't have access to the `ScheduledFuture` which we should cancel once we shut 
the `BackPressureStatsTracker` down. I think this should happen in the 
`JobMaster`.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353616#comment-16353616
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166229283
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/VertexBackPressureLevelTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JobVertexBackPressureInfo.VertexBackPressureLevel}.
+ */
+public class VertexBackPressureLevelTest {
--- End diff --

`extends TestLogger` is missing.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353618#comment-16353618
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166228917
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureStatus;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
+import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
+import static 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureHandlerTest {
--- End diff --

`extends TestLogger` is missing.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353617#comment-16353617
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166228844
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -123,4 +126,19 @@
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+   /**
+* Requests the statistics on operator back pressure.
+*
+* @param jobId   Job for which the stats are requested.
+* @param jobVertexId JobVertex for which the stats are requested.
+* @return A Future to the {@link OperatorBackPressureStats} or {@code 
null} if the stats are
--- End diff --

nit: This should be "A future to the {@link 
OperatorBackPressureStatsResponse}"?


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353615#comment-16353615
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r166228267
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   scheduleRunAsync(() -> runAsync(() -> {
--- End diff --

I think, we don't need `() -> runAsync`.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351667#comment-16351667
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835719
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   getRpcService().getScheduledExecutor().schedule(() -> 
runAsync(() -> {
+   final Optional stackTrace 
= getStackTrace(executionAttemptId, maxStackTraceDepth);
--- End diff --

Fixed.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351666#comment-16351666
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835514
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   getRpcService().getScheduledExecutor().schedule(() -> 
runAsync(() -> {
+   final Optional stackTrace 
= getStackTrace(executionAttemptId, maxStackTraceDepth);
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351660#comment-16351660
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835481
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   getRpcService().getScheduledExecutor().schedule(() -> 
runAsync(() -> {
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351662#comment-16351662
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -867,6 +889,25 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
}
}
 
+   @Override
+   public CompletableFuture> 
getOperatorBackPressureStats(
+   final JobID jobId, final JobVertexID jobVertexId) {
+   final ExecutionJobVertex jobVertex = 
executionGraph.getJobVertex(jobVertexId);
+   if (jobVertex == null) {
+   return FutureUtils.completedExceptionally(new 
FlinkException("JobVertexID not found " +
+   jobVertexId));
+   }
+
+   final Optional 
operatorBackPressureStats =
+   
backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
+   if (!operatorBackPressureStats.isPresent() ||
+   backPressureStatsRefreshInterval <= 
System.currentTimeMillis() - operatorBackPressureStats.get().getEndTimestamp()) 
{
+   
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+   return 
CompletableFuture.completedFuture(Optional.empty());
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351663#comment-16351663
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835486
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -313,6 +325,13 @@ public JobMaster(
.orElse(FutureUtils.completedExceptionally(new 
JobMasterException("The JobMaster has not been started with a REST 
endpoint.")));
 
this.metricQueryServicePath = metricQueryServicePath;
+   this.stackTraceSampleCoordinator = new 
StackTraceSampleCoordinator(rpcService.getExecutor(), 
rpcTimeout.toMilliseconds());
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351661#comment-16351661
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final VertexBackPressureStatus status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final VertexBackPressureLevel backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   protected final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus 
status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) 
VertexBackPressureLevel backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   public static JobVertexBackPressureInfo deprecated() {
+   return new JobVertexBackPressureInfo(
+   VertexBackPressureStatus.DEPRECATED,
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351664#comment-16351664
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165835488
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -358,6 +361,18 @@ public void start() throws Exception {
}
}
 
+   @Override
+   public CompletableFuture> 
getOperatorBackPressureStats(
--- End diff --

done


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350465#comment-16350465
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165666773
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -75,6 +76,17 @@ public void 
setDisconnectJobManagerConsumer(Consumer> d
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
--- End diff --

or maybe not


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348280#comment-16348280
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165298568
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   getRpcService().getScheduledExecutor().schedule(() -> 
runAsync(() -> {
+   final Optional stackTrace 
= getStackTrace(executionAttemptId, maxStackTraceDepth);
--- End diff --

First call to `getStackTrace` should not be delayed. I think it is enough 
to move this line out of the `schedule` call..


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348267#comment-16348267
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165296204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final VertexBackPressureStatus status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final VertexBackPressureLevel backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   protected final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus 
status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) 
VertexBackPressureLevel backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   public static JobVertexBackPressureInfo deprecated() {
+   return new JobVertexBackPressureInfo(
+   VertexBackPressureStatus.DEPRECATED,
--- End diff --

Can do and I thought about it but that's how it is done currently. If 
status is `deprecated`, the ui will render *Sampling in progress*. 


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348269#comment-16348269
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165296332
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -358,6 +361,18 @@ public void start() throws Exception {
}
}
 
+   @Override
+   public CompletableFuture> 
getOperatorBackPressureStats(
--- End diff --

true, forgot about it.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348262#comment-16348262
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165292444
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final VertexBackPressureStatus status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final VertexBackPressureLevel backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   protected final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus 
status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) 
VertexBackPressureLevel backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   public static JobVertexBackPressureInfo deprecated() {
+   return new JobVertexBackPressureInfo(
+   VertexBackPressureStatus.DEPRECATED,
--- End diff --

Why do we explicitly say that the back pressure information is deprecated. 
Serving some old information is better than nothing if we still have some older 
back pressure information, right? The caller could still throw the information 
away based on `endTimestamp` if it is too old.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348259#comment-16348259
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165290983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -358,6 +361,18 @@ public void start() throws Exception {
}
}
 
+   @Override
+   public CompletableFuture> 
getOperatorBackPressureStats(
--- End diff --

I think `Optional` is not serializable. We should always return 
serializable objects as the result of a RPC.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348263#comment-16348263
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165294395
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -305,6 +311,84 @@ public void postStop() throws Exception {
//  RPC methods
// 
==
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final Time timeout) {
+   return requestStackTraceSample(
+   executionAttemptId,
+   sampleId,
+   numSamples,
+   delayBetweenSamples,
+   maxStackTraceDepth,
+   new ArrayList<>(numSamples),
+   new CompletableFuture<>());
+   }
+
+   private CompletableFuture 
requestStackTraceSample(
+   final ExecutionAttemptID executionAttemptId,
+   final int sampleId,
+   final int numSamples,
+   final Time delayBetweenSamples,
+   final int maxStackTraceDepth,
+   final List currentTraces,
+   final CompletableFuture 
resultFuture) {
+
+   if (numSamples > 0) {
+   getRpcService().getScheduledExecutor().schedule(() -> 
runAsync(() -> {
--- End diff --

This should be replaceable with `this.scheduleRunAsync(() -> ...)`


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348261#comment-16348261
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165291802
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -867,6 +889,25 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
}
}
 
+   @Override
+   public CompletableFuture> 
getOperatorBackPressureStats(
+   final JobID jobId, final JobVertexID jobVertexId) {
+   final ExecutionJobVertex jobVertex = 
executionGraph.getJobVertex(jobVertexId);
+   if (jobVertex == null) {
+   return FutureUtils.completedExceptionally(new 
FlinkException("JobVertexID not found " +
+   jobVertexId));
+   }
+
+   final Optional 
operatorBackPressureStats =
+   
backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
+   if (!operatorBackPressureStats.isPresent() ||
+   backPressureStatsRefreshInterval <= 
System.currentTimeMillis() - operatorBackPressureStats.get().getEndTimestamp()) 
{
+   
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+   return 
CompletableFuture.completedFuture(Optional.empty());
--- End diff --

Why not returning the last back-pressure result if there is an old one?


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348260#comment-16348260
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165291604
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -313,6 +325,13 @@ public JobMaster(
.orElse(FutureUtils.completedExceptionally(new 
JobMasterException("The JobMaster has not been started with a REST 
endpoint.")));
 
this.metricQueryServicePath = metricQueryServicePath;
+   this.stackTraceSampleCoordinator = new 
StackTraceSampleCoordinator(rpcService.getExecutor(), 
rpcTimeout.toMilliseconds());
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
--- End diff --

The `BackPressureStatsTracker` and the `StackTraceSampleCoordinator` could 
go into the `JobManagerServices`. That way, we would have a way to pass it into 
the `JobMaster`.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347418#comment-16347418
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165155912
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
 ---
@@ -74,7 +74,7 @@
static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
 
/** Expected method name for back pressure indicating stack trace 
element. */
-   static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+   static final String EXPECTED_METHOD_NAME = 
"requestBufferBuilderBlocking";
--- End diff --

Requires a follow up ticket to make it less fragile.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347417#comment-16347417
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165155639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final VertexBackPressureStatus status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final VertexBackPressureLevel backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   protected final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus 
status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) 
VertexBackPressureLevel backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   public static JobVertexBackPressureInfo deprecated() {
+   return new JobVertexBackPressureInfo(
+   VertexBackPressureStatus.DEPRECATED,
+   null,
+   null,
+   null);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back 

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347414#comment-16347414
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165155368
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -75,6 +76,17 @@ public void 
setDisconnectJobManagerConsumer(Consumer> d
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
--- End diff --

Needs to be implemented


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347413#comment-16347413
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5397

[FLINK-7856][flip6] WIP

WIP

PR is based on #4893 

@tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7856-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5397


commit bea6bf16b2cac1a8da1f3d28432798965b64cea9
Author: zjureel 
Date:   2017-11-15T05:55:28Z

[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint

commit 968f0dfe8a3c6832b0e6a83c5c61eaa1fd886c1b
Author: gyao 
Date:   2018-01-31T19:01:24Z

[hotfix][tests] Add Javadocs to JobMasterTest

Add Javadocs to JobMasterTest.
Remove debug print.

commit 6140fa6f460491bbe0eaf19d15b8a2f5d81622a0
Author: gyao 
Date:   2018-01-31T19:02:59Z

[FLINK-7856][flip6] Implement JobVertexBackPressureHandler

commit 4625a637fa0a17a0ea0a3a48952d562a29fa5c06
Author: gyao 
Date:   2018-01-31T19:06:09Z

[hotfix] Log swallowed exception in JobMaster




> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-01-29 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343592#comment-16343592
 ] 

Gary Yao commented on FLINK-7856:
-

[~zjureel] Thank you for your work so far. I will rebase your changes against 
the current master, and add the remaining implementation if you are okay with 
it.

> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252985#comment-16252985
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4893
  
@tillrohrmann Thank you for your review, I have fixed the problems of this 
PR, thanks


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241710#comment-16241710
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerExcepti

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241708#comment-16241708
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_RATIO)
+   private final double ratio;

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241707#comment-16241707
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
--- End diff --

is that the status or the subtask name? Adapt the field annotation 
accordingly.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
>

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241702#comment-16241702
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
--- End diff --

no need for a the object type here. We can use a primitive long.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241700#comment-16241700
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

Same here, this should be an enum.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241705#comment-16241705
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305184
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

enum


> Port JobVertexBackPressureHandler to REST endpoint
> ---

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241703#comment-16241703
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
--- End diff --

null checks are missing.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241698#comment-16241698
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4893
  
I think the plan sounds good @zjureel. We should however not do it in this 
PR. Thus, I would suggest that we create a `JobVertexBackPressureHandler` which 
directly inherits from `AbstractRestHandler` and for the time being returns an 
empty `JobVertexBackPressureInfo`. Then I would add the 
`BackPressureStatsTracker` to the `JobMaster` in a separate PR. What do you 
think?


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241699#comment-16241699
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
--- End diff --

This should be an enum, I think.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241412#comment-16241412
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4893
  
@tillrohrmann Thank you for you suggestions, and I think move these things 
to the `JobMaster` would be good. I think this issue could be fixed as follows:

1. Add `CompletableFuture 
requestVertexBackPressure(JobID jobId, JobVertexID vertexID, @RpcTimeout Time 
timeout);` method in `RestfulGateway` and `JobMasterGateway`.
2. Add `BackPressureStatsTracker` to `JobMaster,` and return 
`JobVertexBackPressureInfo` in method `requestVertexBackPressure`
3. Use `LegacyRestHandlerAdapter` in `DispatcherRestEndpoint` for 
`JobVertexBackPressureHandler`

What do you think? thanks


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238114#comment-16238114
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerExcepti

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238115#comment-16238115
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureHeaders implements 
MessageHeaders {
+
+   private static final JobVertexBackPressureHeaders INSTANCE = new 
JobVertexBackPressureHeaders();
+
+   private static final String URL = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
--- End diff --

Instead of writing `/jobs/:jobid` we could write `/jobs/: + 
JobIDParameter.KEY`.


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16216505#comment-16216505
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4893

[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint

## What is the purpose of the change

Port JobVertexBackPressureHandler to REST endpoint

## Brief change log

  - *Add JobVertexBackPressureInfo class to describe the json format 
response*
  - *Add JobVertexBackPressureHandler to deal with back pressure in rest 
server*


## Verifying this change
This change added tests and can be verified as follows:

  - *Added test case JobVertexBackPressureInfoTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7856

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4893


commit 3c5c183ff82e04174f66553fa28aaafa1664f478
Author: zjureel 
Date:   2017-10-24T05:53:48Z

[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint




> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)