[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4893 ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305236 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; --- End diff -- is that the status or the subtask name? Adapt the field annotation accordingly. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305184 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- enum ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected JobVertexBackPressureInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + JobVertexID jobVertexID =
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; --- End diff -- null checks are missing. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305060 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; --- End diff -- no need for a the object type here. We can use a primitive long. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149305275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_END_TIMESTAMP) + private final Long endTimestamp; + + @JsonProperty(FIELD_NAME_SUBTASKS) + private final List subtasks; + + @JsonCreator + public JobVertexBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) String status, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, + @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp, + @JsonProperty(FIELD_NAME_SUBTASKS) List subtasks) { + this.status = status; + this.backpressureLevel = backpressureLevel; + this.endTimestamp = endTimestamp; + this.subtasks = subtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o; + return Objects.equals(status, that.status) && + Objects.equals(backpressureLevel, that.backpressureLevel) && + Objects.equals(endTimestamp, that.endTimestamp) && + Objects.equals(subtasks, that.subtasks); + } + + @Override + public int hashCode() { + return Objects.hash(status, backpressureLevel, endTimestamp, subtasks); + } + + //- + // Static helper classes + //- + + /** +* Nested class to encapsulate the sub tasks back pressure. +*/ + public static final class SubtaskBackPressureInfo { + + public static final String FIELD_NAME_STATUS = "subtask"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_RATIO = "ratio"; + + @JsonProperty(FIELD_NAME_STATUS) + private final int subtask; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; + + @JsonProperty(FIELD_NAME_RATIO) + private final double ratio; + + public SubtaskBackPressureInfo( + @JsonProperty(FIELD_NAME_STATUS) int subtask, + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) String backpressureLevel, +
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304998 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL) + private final String backpressureLevel; --- End diff -- Same here, this should be an enum. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r149304880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +/** + * Response type of the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureInfo implements ResponseBody { + + public static final String FIELD_NAME_STATUS = "status"; + public static final String FIELD_NAME_BACKPRESSURE_LEVEL = "backpressure-level"; + public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp"; + public static final String FIELD_NAME_SUBTASKS = "subtasks"; + + @JsonProperty(FIELD_NAME_STATUS) + private final String status; --- End diff -- This should be an enum, I think. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857309 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandler.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler for the job vertex back pressure. + */ +public class JobVertexBackPressureHandler extends AbstractExecutionGraphHandler{ + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + Configuration clusterConfiguration) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + // Back pressure stats tracker config + this.refreshInterval = clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); + this.backPressureStatsTracker = new BackPressureStatsTracker( + new StackTraceSampleCoordinator(executor, 6), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL), + clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), + Time.milliseconds(clusterConfiguration.getInteger(WebOptions.BACKPRESSURE_DELAY))); + } + + @Override + protected JobVertexBackPressureInfo handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + JobVertexID jobVertexID =
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4893#discussion_r148857541 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureHeaders.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link JobVertexBackPressureHandler}. + */ +public class JobVertexBackPressureHeaders implements MessageHeaders{ + + private static final JobVertexBackPressureHeaders INSTANCE = new JobVertexBackPressureHeaders(); + + private static final String URL = "/jobs/:jobid/vertices/:vertexid/backpressure"; --- End diff -- Instead of writing `/jobs/:jobid` we could write `/jobs/: + JobIDParameter.KEY`. ---
[GitHub] flink pull request #4893: [FLINK-7856][flip6] Port JobVertexBackPressureHand...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4893 [FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint ## What is the purpose of the change Port JobVertexBackPressureHandler to REST endpoint ## Brief change log - *Add JobVertexBackPressureInfo class to describe the json format response* - *Add JobVertexBackPressureHandler to deal with back pressure in rest server* ## Verifying this change This change added tests and can be verified as follows: - *Added test case JobVertexBackPressureInfoTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7856 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4893.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4893 commit 3c5c183ff82e04174f66553fa28aaafa1664f478 Author: zjureelDate: 2017-10-24T05:53:48Z [FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint ---