[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5805 ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181691127 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler { + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, respon
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181660436 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler { + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseH
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181664352 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +/** + * Headers for aggregating subtask metrics. + */ +public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHeaders { + + private static final AggregatedSubtaskMetricsHeaders INSTANCE = new AggregatedSubtaskMetricsHeaders(); + + private AggregatedSubtaskMetricsHeaders() { + } + + @Override + public AggregatedSubtaskMetricsParameters getUnresolvedMessageParameters() { + return new AggregatedSubtaskMetricsParameters(); + } + + @Override + public String getTargetRestEndpointURL() { + return "/jobs/:jobid/vertices/:vertexid/subtasks/metrics"; --- End diff -- normally `"/jobs/:" + JobIDPathParameter.KEY + "vertices/:" [...]` is used ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181666171 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Based {@link MessageHeaders} class for aggregating metrics. + */ +public abstract class AbstractAggregatedMetricsHeaders implements MessageHeaders { --- End diff -- nit: raw types (`AbstractAggregatedMetricsParameters`) ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181583359 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- Remembered why I did it this way. We would have to make the entire `DoubleAccumulator` class public otherwise. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534458 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; + String range = rawRange.trim(); +
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534395 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- ah yes, I copied it at the start but never got around to deleting the old one.. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181414763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- This is virtually the same as `org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why do we have to duplicate it? ``` diff ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java 19c19 < package org.apache.flink.runtime.rest.handler.legacy.metrics; --- > package org.apache.flink.runtime.rest.handler.job.metrics; ``` ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler { + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher fetcher) { --- End diff -- nit: use of rawtype ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411022 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; + String range = rawRange.trim(); +
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181408771 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); --- End diff -- I think there is a potential NPE because `store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return `null`. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y&agg=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler { + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseH
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181401972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Locale; + +/** + * TODO: add javadoc. --- End diff -- Should be replaced. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411248 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y&subtasks=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; + String range = rawRange.trim(); +
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5805 [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 ## What is the purpose of the change This PR ports the `AggregatingMetricsHandler` classes to flip6. Additionally this PR contains 2 minor changes: * the `MessageParameter` constructor is now `protected` * the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part ## Brief change log * the `MessageParameter` constructor is now `protected` * the converter methods of `QueryParameters` may now also throw `ConversionExceptions` like their `PathParamater` counter-part * port `AggregatingMetricsHandler` to flip6 * duplicate `DoubleAccumulator` class * define headers, various parameters and response bodies * define handlers * conceptually they work like the legacy versions, but I had to beef up the aggregation factories a bit as the existing approach wasn't compatible with an actual constructor * add handlers to WebMonitorEndpoint ## Verifying this change This change added tests and can be verified as follows: * run tests extending `AggregatingMetricsHandlerTestBase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (documented (via legacy documentation)) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8370 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5805.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5805 commit e238b85d12feda1f66de9ebf8aaaeaa223c3129d Author: zentol Date: 2018-03-26T12:41:03Z [hotfix][metrics] Make MessageParameter constructor protected commit 188f5ef96bce2a7fa8ee382ab1d5fc900d078149 Author: zentol Date: 2018-03-26T12:41:25Z [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions commit 85d00792ca34849034bc7c59a9b7d07ed4fff486 Author: zentol Date: 2018-03-28T10:52:07Z [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 ---