[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2018-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4893


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305236
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
--- End diff --

is that the status or the subtask name? Adapt the field annotation 
accordingly.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305184
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

enum


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+   JobVertexID jobVertexID = 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
--- End diff --

null checks are missing.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
--- End diff --

no need for a the object type here. We can use a primitive long.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149305275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+   private final Long endTimestamp;
+
+   @JsonProperty(FIELD_NAME_SUBTASKS)
+   private final List subtasks;
+
+   @JsonCreator
+   public JobVertexBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) String status,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+   @JsonProperty(FIELD_NAME_SUBTASKS) 
List subtasks) {
+   this.status = status;
+   this.backpressureLevel = backpressureLevel;
+   this.endTimestamp = endTimestamp;
+   this.subtasks = subtasks;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+   return Objects.equals(status, that.status) &&
+   Objects.equals(backpressureLevel, 
that.backpressureLevel) &&
+   Objects.equals(endTimestamp, that.endTimestamp) &&
+   Objects.equals(subtasks, that.subtasks);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(status, backpressureLevel, endTimestamp, 
subtasks);
+   }
+
+   
//-
+   // Static helper classes
+   
//-
+
+   /**
+* Nested class to encapsulate the sub tasks back pressure.
+*/
+   public static final class SubtaskBackPressureInfo {
+
+   public static final String FIELD_NAME_STATUS = "subtask";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_RATIO = "ratio";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final int subtask;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
+
+   @JsonProperty(FIELD_NAME_RATIO)
+   private final double ratio;
+
+   public SubtaskBackPressureInfo(
+   @JsonProperty(FIELD_NAME_STATUS) int subtask,
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String 
backpressureLevel,
+   

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304998
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
+
+   @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+   private final String backpressureLevel;
--- End diff --

Same here, this should be an enum.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-07 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r149304880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+   public static final String FIELD_NAME_STATUS = "status";
+   public static final String FIELD_NAME_BACKPRESSURE_LEVEL = 
"backpressure-level";
+   public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+   public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+   @JsonProperty(FIELD_NAME_STATUS)
+   private final String status;
--- End diff --

This should be an enum, I think.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857309
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex back pressure.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractExecutionGraphHandler {
+   /** Back pressure stats tracker. */
+   private final BackPressureStatsTracker backPressureStatsTracker;
+
+   /** Time after which stats are considered outdated. */
+   private final int refreshInterval;
+
+   public JobVertexBackPressureHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   MessageHeaders messageHeaders,
+   ExecutionGraphCache executionGraphCache,
+   Executor executor,
+   Configuration clusterConfiguration) {
+   super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+   // Back pressure stats tracker config
+   this.refreshInterval = 
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
+   this.backPressureStatsTracker = new BackPressureStatsTracker(
+   new StackTraceSampleCoordinator(executor, 6),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
+   
clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
+   
Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
+   }
+
+   @Override
+   protected JobVertexBackPressureInfo 
handleRequest(HandlerRequest 
request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+   JobVertexID jobVertexID = 

[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-11-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4893#discussion_r148857541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexBackPressureHandler}.
+ */
+public class JobVertexBackPressureHeaders implements 
MessageHeaders {
+
+   private static final JobVertexBackPressureHeaders INSTANCE = new 
JobVertexBackPressureHeaders();
+
+   private static final String URL = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
--- End diff --

Instead of writing `/jobs/:jobid` we could write `/jobs/: + 
JobIDParameter.KEY`.


---


[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...

2017-10-24 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4893

[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint

## What is the purpose of the change

Port JobVertexBackPressureHandler to REST endpoint

## Brief change log

  - *Add JobVertexBackPressureInfo class to describe the json format 
response*
  - *Add JobVertexBackPressureHandler to deal with back pressure in rest 
server*


## Verifying this change
This change added tests and can be verified as follows:

  - *Added test case JobVertexBackPressureInfoTest*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7856

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4893


commit 3c5c183ff82e04174f66553fa28aaafa1664f478
Author: zjureel 
Date:   2017-10-24T05:53:48Z

[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint




---