[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5805


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181691127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-16 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181660436
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-16 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181664352
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+/**
+ * Headers for aggregating subtask metrics.
+ */
+public class AggregatedSubtaskMetricsHeaders extends 
AbstractAggregatedMetricsHeaders {
+
+   private static final AggregatedSubtaskMetricsHeaders INSTANCE = new 
AggregatedSubtaskMetricsHeaders();
+
+   private AggregatedSubtaskMetricsHeaders() {
+   }
+
+   @Override
+   public AggregatedSubtaskMetricsParameters 
getUnresolvedMessageParameters() {
+   return new AggregatedSubtaskMetricsParameters();
+   }
+
+   @Override
+   public String getTargetRestEndpointURL() {
+   return "/jobs/:jobid/vertices/:vertexid/subtasks/metrics";
--- End diff --

normally `"/jobs/:" + JobIDPathParameter.KEY + "vertices/:" [...]` is used


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-16 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181666171
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractAggregatedMetricsHeaders.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Based {@link MessageHeaders} class for aggregating metrics.
+ */
+public abstract class AbstractAggregatedMetricsHeaders implements 
MessageHeaders {
--- End diff --

nit: raw types (`AbstractAggregatedMetricsParameters`)


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181583359
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

Remembered why I did it this way. We would have to make the entire 
`DoubleAccumulator` class public otherwise.


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534458
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+  

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534395
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

ah yes, I copied it at the start but never got around to deleting the old 
one..


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181414763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

This is virtually the same as 
`org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why 
do we have to duplicate it?

```
diff 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
19c19
< package org.apache.flink.runtime.rest.handler.legacy.metrics;
---
> package org.apache.flink.runtime.rest.handler.job.metrics;
```


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411022
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+   

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181408771
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
--- End diff --

I think there is a potential NPE because 
`store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return 
`null`.


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181401972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
--- End diff --

Should be replaced.


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+   

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-03 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5805

[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 

## What is the purpose of the change

This PR ports the `AggregatingMetricsHandler` classes to flip6.

Additionally this PR contains 2 minor changes:
* the `MessageParameter` constructor is now `protected`
* the converter methods of `QueryParameters` may now also throw 
`ConversionExceptions` like their `PathParamater` counter-part

## Brief change log

* the `MessageParameter` constructor is now `protected`
* the converter methods of `QueryParameters` may now also throw 
`ConversionExceptions` like their `PathParamater` counter-part
* port `AggregatingMetricsHandler` to flip6
  * duplicate `DoubleAccumulator` class
  * define headers, various parameters and response bodies
  * define handlers
* conceptually they work like the legacy versions, but I had to beef up 
the aggregation factories a bit as the existing approach wasn't compatible with 
an actual constructor
  *  add handlers to WebMonitorEndpoint

## Verifying this change

This change added tests and can be verified as follows:

* run tests extending `AggregatingMetricsHandlerTestBase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (documented (via legacy 
documentation))


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8370

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5805.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5805


commit e238b85d12feda1f66de9ebf8aaaeaa223c3129d
Author: zentol 
Date:   2018-03-26T12:41:03Z

[hotfix][metrics] Make MessageParameter constructor protected

commit 188f5ef96bce2a7fa8ee382ab1d5fc900d078149
Author: zentol 
Date:   2018-03-26T12:41:25Z

[hotfix][metrics] Allow QueryParameter converters to throw 
ConversionExceptions

commit 85d00792ca34849034bc7c59a9b7d07ed4fff486
Author: zentol 
Date:   2018-03-28T10:52:07Z

[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6




---