[jira] [Commented] (FLINK-4347) Implement SlotManager core

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427574#comment-15427574
 ] 

ASF GitHub Bot commented on FLINK-4347:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2388

[FLINK-4347][cluster management] Implement SlotManager core

Implement some SlotManager's logic mainly in 4 parts:
* handle slot request initiated by JobMaster
* sync slot status trigger by TaskExecutor's heartbeat
* handle slot request failed at TaskExecutor's side
* handle Resouce(Container) failure (either triggered by some timeout 
detector or cluster manager master. e.g. yarn master)

 I'm not sure whether these logics been implemented too complex, since the 
local status are not always true due to the distributed environment. The 
principle i used is: 
 * Always trust TaskExecutor.
 * Generate each action based on the incoming message type (actually, the 
interface been called) & current information i have. And update local 
information based on the incoming message.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink jira-4347

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2388


commit d9a055983c3ff7a1c3049c9403c970cf21173061
Author: Kurt Young 
Date:   2016-08-18T07:48:30Z

[FLINK-4347][cluster management] Implement SlotManager core




> Implement SlotManager core
> --
>
> Key: FLINK-4347
> URL: https://issues.apache.org/jira/browse/FLINK-4347
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> The slot manager is responsible to maintain the list of slot requests and 
> slot allocations. It allows to request slots from the registered 
> TaskExecutors and issues container allocation requests in case that there are 
> not enough available resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2388: [FLINK-4347][cluster management] Implement SlotMan...

2016-08-18 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2388

[FLINK-4347][cluster management] Implement SlotManager core

Implement some SlotManager's logic mainly in 4 parts:
* handle slot request initiated by JobMaster
* sync slot status trigger by TaskExecutor's heartbeat
* handle slot request failed at TaskExecutor's side
* handle Resouce(Container) failure (either triggered by some timeout 
detector or cluster manager master. e.g. yarn master)

 I'm not sure whether these logics been implemented too complex, since the 
local status are not always true due to the distributed environment. The 
principle i used is: 
 * Always trust TaskExecutor.
 * Generate each action based on the incoming message type (actually, the 
interface been called) & current information i have. And update local 
information based on the incoming message.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink jira-4347

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2388


commit d9a055983c3ff7a1c3049c9403c970cf21173061
Author: Kurt Young 
Date:   2016-08-18T07:48:30Z

[FLINK-4347][cluster management] Implement SlotManager core




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4427) Add slot / container releasing logic to SlotManager (Standalone / Yarn / Mesos)

2016-08-18 Thread Kurt Young (JIRA)
Kurt Young created FLINK-4427:
-

 Summary: Add slot / container releasing logic to SlotManager 
(Standalone / Yarn / Mesos)
 Key: FLINK-4427
 URL: https://issues.apache.org/jira/browse/FLINK-4427
 Project: Flink
  Issue Type: Sub-task
Reporter: Kurt Young


Currently we only have allocation logic for SlotManager / ResourceManager, for 
some batch job, slots that already finished be released, and should trigger 
container release in different cluster modes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4347) Implement SlotManager core

2016-08-18 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-4347:
--
Summary: Implement SlotManager core  (was: Implement SlotManager core for 
new ResourceManager)

> Implement SlotManager core
> --
>
> Key: FLINK-4347
> URL: https://issues.apache.org/jira/browse/FLINK-4347
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> The slot manager is responsible to maintain the list of slot requests and 
> slot allocations. It allows to request slots from the registered 
> TaskExecutors and issues container allocation requests in case that there are 
> not enough available resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4222) Allow Kinesis configuration to get credentials from AWS Metadata

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427391#comment-15427391
 ] 

ASF GitHub Bot commented on FLINK-4222:
---

Github user chadnickbok commented on the issue:

https://github.com/apache/flink/pull/2260
  
Sorry I dropped off the map on this one - I actually got pulled onto a 
front-end project and ended up not having any time to follow this pull request 
:(

Glad to hear this PR is being considered for merging :D


> Allow Kinesis configuration to get credentials from AWS Metadata
> 
>
> Key: FLINK-4222
> URL: https://issues.apache.org/jira/browse/FLINK-4222
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Nick Chadwick
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When deploying Flink TaskManagers in an EC2 environment, it would be nice to 
> be able to use the EC2 IAM Role credentials provided by the EC2 Metadata 
> service.
> This allows for credentials to be automatically discovered by services 
> running on EC2 instances at runtime, and removes the need to explicitly 
> create and assign credentials to TaskManagers.
> This should be a fairly small change to the configuration of the 
> flink-connector-kinesis, which will greatly improve the ease of deployment to 
> Amazon EC2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2260: [FLINK-4222] Allow Kinesis configuration to get credentia...

2016-08-18 Thread chadnickbok
Github user chadnickbok commented on the issue:

https://github.com/apache/flink/pull/2260
  
Sorry I dropped off the map on this one - I actually got pulled onto a 
front-end project and ended up not having any time to follow this pull request 
:(

Glad to hear this PR is being considered for merging :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2260: [FLINK-4222] Allow Kinesis configuration to get credentia...

2016-08-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2260
  
@rmetzger Sure, that would be great. Thanks for your help with merging this.
Thanks for your contribution @chadnickbok :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4222) Allow Kinesis configuration to get credentials from AWS Metadata

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427390#comment-15427390
 ] 

ASF GitHub Bot commented on FLINK-4222:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2260
  
@rmetzger Sure, that would be great. Thanks for your help with merging this.
Thanks for your contribution @chadnickbok :)


> Allow Kinesis configuration to get credentials from AWS Metadata
> 
>
> Key: FLINK-4222
> URL: https://issues.apache.org/jira/browse/FLINK-4222
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Nick Chadwick
>Priority: Minor
>  Labels: easyfix
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When deploying Flink TaskManagers in an EC2 environment, it would be nice to 
> be able to use the EC2 IAM Role credentials provided by the EC2 Metadata 
> service.
> This allows for credentials to be automatically discovered by services 
> running on EC2 instances at runtime, and removes the need to explicitly 
> create and assign credentials to TaskManagers.
> This should be a fairly small change to the configuration of the 
> flink-connector-kinesis, which will greatly improve the ease of deployment to 
> Amazon EC2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4426) Unable to create proxy to the ResourceManager

2016-08-18 Thread Harpreet Sawhney (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Harpreet Sawhney updated FLINK-4426:

Affects Version/s: 1.1.1

> Unable to create proxy to the ResourceManager
> -
>
> Key: FLINK-4426
> URL: https://issues.apache.org/jira/browse/FLINK-4426
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.0.3, 1.1.1
> Environment: Flink 1.0.3 built with MapR  (2.7.0-mapr-1602)
>Reporter: Harpreet Sawhney
>
> We have a Mapr cluster on which I am trying to run a single flink job (from 
> examples) on YARN
> Running the example (./bin/flink run -m yarn-cluster -yn 4  
> ./examples/batch/WordCount.jar) fails with an "Unable to create proxy to the 
> ResourceManager null" error:
> More detailed logs from the flink run below (server addresses removed):
> =
> 2016-08-18 23:02:32,249 DEBUG 
> org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
> org.apache.hadoop.metrics2.lib.MutableRate 
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with 
> annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of 
> successful kerberos logins and latency (milliseconds)], valueName=Time, 
> about=, type=DEFAULT, always=false, sampleName=Ops)
> 2016-08-18 23:02:32,261 DEBUG 
> org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
> org.apache.hadoop.metrics2.lib.MutableRate 
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with 
> annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of 
> failed kerberos logins and latency (milliseconds)], valueName=Time, about=, 
> type=DEFAULT, always=false, sampleName=Ops)
> 2016-08-18 23:02:32,261 DEBUG 
> org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
> org.apache.hadoop.metrics2.lib.MutableRate 
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with 
> annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[GetGroups], 
> valueName=Time, about=, type=DEFAULT, always=false, sampleName=Ops)
> 2016-08-18 23:02:32,263 DEBUG 
> org.apache.hadoop.metrics2.impl.MetricsSystemImpl - UgiMetrics, 
> User and group related metrics
> 2016-08-18 23:02:33,777 DEBUG com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils 
>   - init
> 2016-08-18 23:02:33,793 DEBUG com.mapr.baseutils.JVMProperties
>   - Setting JVM property zookeeper.saslprovider to 
> com.mapr.security.simplesasl.SimpleSaslProvider
> 2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties
>   - Setting JVM property zookeeper.sasl.clientconfig to 
> Client_simple
> 2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties
>   - Setting JVM property java.security.auth.login.config to 
> /opt/mapr/conf/mapr.login.conf
> 2016-08-18 23:02:33,797 DEBUG org.apache.hadoop.conf.Configuration
>   - Loaded org.apache.hadoop.conf.CoreDefaultProperties
> 2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation 
>   - HADOOP_SECURITY_AUTHENTICATION is set to: SIMPLE
> 2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation 
>   - Login configuration entry is hadoop_simple
> 2016-08-18 23:02:33,806 DEBUG org.apache.hadoop.security.UserGroupInformation 
>   - authenticationMethod from JAAS configuration:SIMPLE
> 2016-08-18 23:02:33,867 DEBUG org.apache.hadoop.conf.Configuration
>   - Loaded org.apache.hadoop.conf.CoreDefaultProperties
> 2016-08-18 23:02:33,875 DEBUG org.apache.hadoop.security.Groups   
>   -  Creating new Groups object
> 2016-08-18 23:02:33,878 DEBUG org.apache.hadoop.util.PerformanceAdvisory  
>   - Falling back to shell based
> 2016-08-18 23:02:33,879 DEBUG 
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback  - Group 
> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
> 2016-08-18 23:02:33,934 DEBUG org.apache.hadoop.conf.Configuration
>   - Loaded org.apache.hadoop.conf.CoreDefaultProperties
> 2016-08-18 23:02:34,002 DEBUG org.apache.hadoop.conf.Configuration
>   - Loaded org.apache.hadoop.yarn.conf.YarnDefaultProperties
> 2016-08-18 23:02:34,021 DEBUG org.apache.hadoop.util.Shell
>   - setsid exited with exit code 0
> 2016-08-18 23:02:34,047 DEBUG org.apache.hadoop.security.Groups   
>   - Group mapping 
> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; 
> cacheTimeout=30; warningDeltaMs=5000
> 2016-08-18 

[jira] [Created] (FLINK-4426) Unable to create proxy to the ResourceManager

2016-08-18 Thread Harpreet Sawhney (JIRA)
Harpreet Sawhney created FLINK-4426:
---

 Summary: Unable to create proxy to the ResourceManager
 Key: FLINK-4426
 URL: https://issues.apache.org/jira/browse/FLINK-4426
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.0.3
 Environment: Flink 1.0.3 built with MapR  (2.7.0-mapr-1602)
Reporter: Harpreet Sawhney


We have a Mapr cluster on which I am trying to run a single flink job (from 
examples) on YARN

Running the example (./bin/flink run -m yarn-cluster -yn 4  
./examples/batch/WordCount.jar) fails with an "Unable to create proxy to the 
ResourceManager null" error:

More detailed logs from the flink run below (server addresses removed):

=
2016-08-18 23:02:32,249 DEBUG 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of 
successful kerberos logins and latency (milliseconds)], valueName=Time, about=, 
type=DEFAULT, always=false, sampleName=Ops)
2016-08-18 23:02:32,261 DEBUG 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of failed 
kerberos logins and latency (milliseconds)], valueName=Time, about=, 
type=DEFAULT, always=false, sampleName=Ops)
2016-08-18 23:02:32,261 DEBUG 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory  - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[GetGroups], 
valueName=Time, about=, type=DEFAULT, always=false, sampleName=Ops)
2016-08-18 23:02:32,263 DEBUG org.apache.hadoop.metrics2.impl.MetricsSystemImpl 
- UgiMetrics, User and group related metrics
2016-08-18 23:02:33,777 DEBUG com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils   
- init
2016-08-18 23:02:33,793 DEBUG com.mapr.baseutils.JVMProperties  
- Setting JVM property zookeeper.saslprovider to 
com.mapr.security.simplesasl.SimpleSaslProvider
2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties  
- Setting JVM property zookeeper.sasl.clientconfig to Client_simple
2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties  
- Setting JVM property java.security.auth.login.config to 
/opt/mapr/conf/mapr.login.conf
2016-08-18 23:02:33,797 DEBUG org.apache.hadoop.conf.Configuration  
- Loaded org.apache.hadoop.conf.CoreDefaultProperties
2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation   
- HADOOP_SECURITY_AUTHENTICATION is set to: SIMPLE
2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation   
- Login configuration entry is hadoop_simple
2016-08-18 23:02:33,806 DEBUG org.apache.hadoop.security.UserGroupInformation   
- authenticationMethod from JAAS configuration:SIMPLE
2016-08-18 23:02:33,867 DEBUG org.apache.hadoop.conf.Configuration  
- Loaded org.apache.hadoop.conf.CoreDefaultProperties
2016-08-18 23:02:33,875 DEBUG org.apache.hadoop.security.Groups 
-  Creating new Groups object
2016-08-18 23:02:33,878 DEBUG org.apache.hadoop.util.PerformanceAdvisory
- Falling back to shell based
2016-08-18 23:02:33,879 DEBUG 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback  - Group 
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
2016-08-18 23:02:33,934 DEBUG org.apache.hadoop.conf.Configuration  
- Loaded org.apache.hadoop.conf.CoreDefaultProperties
2016-08-18 23:02:34,002 DEBUG org.apache.hadoop.conf.Configuration  
- Loaded org.apache.hadoop.yarn.conf.YarnDefaultProperties
2016-08-18 23:02:34,021 DEBUG org.apache.hadoop.util.Shell  
- setsid exited with exit code 0
2016-08-18 23:02:34,047 DEBUG org.apache.hadoop.security.Groups 
- Group mapping 
impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; 
cacheTimeout=30; warningDeltaMs=5000
2016-08-18 23:02:34,058 DEBUG 
org.apache.hadoop.security.login.HadoopLoginModule- Priority 
principal search list is [class javax.security.auth.kerberos.KerberosPrincipal]
2016-08-18 23:02:34,058 DEBUG 
org.apache.hadoop.security.login.HadoopLoginModule- Additional 
principal search list is [class 

[jira] [Commented] (FLINK-305) Add Maven plugin to generate test coverage reports

2016-08-18 Thread Pavel Fadeev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427281#comment-15427281
 ] 

Pavel Fadeev commented on FLINK-305:


Dear [~fhueske],
I`ve just discovered that code coverage integration (contributd above) 
occasionally removed from project. 

Do you know if this feature still required?

> Add Maven plugin to generate test coverage reports
> --
>
> Key: FLINK-305
> URL: https://issues.apache.org/jira/browse/FLINK-305
> Project: Flink
>  Issue Type: Improvement
>Reporter: GitHub Import
>  Labels: github-import
> Fix For: pre-apache
>
>
> It would be good to have the option to generate test coverage reports for 
> Stratosphere.
> The Maven plugin for test coverage is called 
> [Cobertura|http://mojo.codehaus.org/cobertura-maven-plugin/].
> Maybe, the generation can be en/disabled by the profile mechanism such that 
> it does not influence the usual build process.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/305
> Created by: [fhueske|https://github.com/fhueske]
> Labels: build system, enhancement, testing, 
> Created at: Thu Nov 28 13:55:52 CET 2013
> State: closed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427212#comment-15427212
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75395325
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
--- End diff --

Will tackle later in follow-up task, since I am making changes to the 
artifact server for the dispatcher's purposes.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75395325
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
--- End diff --

Will tackle later in follow-up task, since I am making changes to the 
artifact server for the dispatcher's purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4371) Add ability to take savepoints from job manager RESTful API

2016-08-18 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu reassigned FLINK-4371:
---

Assignee: Zhenzhong Xu

> Add ability to take savepoints from job manager RESTful API
> ---
>
> Key: FLINK-4371
> URL: https://issues.apache.org/jira/browse/FLINK-4371
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Webfrontend
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>
> subtask of FLINK-4336



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427117#comment-15427117
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
@wuchong I've removed the test as you've suggested since it pretty much 
duplicates `TableSinkITCase`.


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-18 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
@wuchong I've removed the test as you've suggested since it pretty much 
duplicates `TableSinkITCase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4395) Eager processing of late arrivals in CEP operator

2016-08-18 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427082#comment-15427082
 ] 

Ivan Mushketyk commented on FLINK-4395:
---

I'll work on this if no one minds.

> Eager processing of late arrivals in CEP operator
> -
>
> Key: FLINK-4395
> URL: https://issues.apache.org/jira/browse/FLINK-4395
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Till Rohrmann
>Priority: Minor
>
> At the moment elements are only processed after the CEP operator has received 
> a watermark larger than the elements (in EventTime mode). In case of late 
> arrivals this means that the late elements are not processed until the next 
> watermark has arrived.
> In order to decrease the latency for this scenario, I propose to eagerly 
> process late arrivals in the CEP operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4395) Eager processing of late arrivals in CEP operator

2016-08-18 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-4395:
-

Assignee: Ivan Mushketyk

> Eager processing of late arrivals in CEP operator
> -
>
> Key: FLINK-4395
> URL: https://issues.apache.org/jira/browse/FLINK-4395
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> At the moment elements are only processed after the CEP operator has received 
> a watermark larger than the elements (in EventTime mode). In case of late 
> arrivals this means that the late elements are not processed until the next 
> watermark has arrived.
> In order to decrease the latency for this scenario, I propose to eagerly 
> process late arrivals in the CEP operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-08-18 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427080#comment-15427080
 ] 

Ivan Mushketyk commented on FLINK-2254:
---

Hi [~vkalavri],

I think I could work on this task.
Do you what is the status of this task? Is anybody working on it right now? 
Does design need some rework?

> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4303) Add CEP examples

2016-08-18 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427068#comment-15427068
 ] 

Ivan Mushketyk commented on FLINK-4303:
---

I can add an example for CEP.
[~twalthr] do you have any interesting idea what this example may be based 
upon. In other words, would you like to see pattern detection in a particular 
domain?

> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427045#comment-15427045
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2314
  
@rmetzger I performed the check as you described and everything seems to be 
fine.
I also updated the PR according to your suggestions.
Is this commit in a good shape to merge it?


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2314: [FLINK-3298] Implement ActiveMQ streaming connector

2016-08-18 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2314
  
@rmetzger I performed the check as you described and everything seems to be 
fine.
I also updated the PR according to your suggestions.
Is this commit in a good shape to merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2016-08-18 Thread Sunny T (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426944#comment-15426944
 ] 

Sunny T edited comment on FLINK-4422 at 8/18/16 6:19 PM:
-

Hi [~StephanEwen], I would like to work on this. Is there any dependency for 
this task. Or is it independent?

Thanks,
Sunny


was (Author: tsunny):
Hi [~StephanEwen], I would like to work on this. Is there any dependency for 
this task. Or is it independent?

> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2016-08-18 Thread Sunny T (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426944#comment-15426944
 ] 

Sunny T commented on FLINK-4422:


Hi [~StephanEwen], I would like to work on this. Is there any dependency for 
this task. Or is it independent?

> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75360196
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
--- End diff --

I think we are mixing 2 separate issues here:
A) the format of the transfer
B) how the retrieved metrics are added to the store

Your comment regarding reparsing the string for Histograms targets B. This 
can be fixed easily by providing specific `addHistogram/...` methods to the 
store.

Now, let's talk about A. Essentially, the goal was to reduce the overhead 
for the sender. If you look at the dumping procedure in the 
`MetricQueryService` you will find that for the sender the current requires the 
least amount of work, as far as i can tell. We don't create more elaborate 
containers (which btw. i would personally prefer too!) as this is another 
short-lived object created for every metric.

Also, in regards to sender/receiver going out of sync: we can add a test to 
guarantee that sender do not go out of sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426941#comment-15426941
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75360196
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
--- End diff --

I think we are mixing 2 separate issues here:
A) the format of the transfer
B) how the retrieved metrics are added to the store

Your comment regarding reparsing the string for Histograms targets B. This 
can be fixed easily by providing specific `addHistogram/...` methods to the 
store.

Now, let's talk about A. Essentially, the goal was to reduce the overhead 
for the sender. If you look at the dumping procedure in the 
`MetricQueryService` you will find that for the sender the current requires the 
least amount of work, as far as i can tell. We don't create more elaborate 
containers (which btw. i would personally prefer too!) as this is another 
short-lived object created for every metric.

Also, in regards to sender/receiver going out of sync: we can add a test to 
guarantee that sender do not go out of sync.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426892#comment-15426892
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355525
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.dispatch.OnSuccess;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+   private final JobManagerRetriever retriever;
+   private final ExecutionContext ctx;
+   private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+
+   private MetricStore metrics = new MetricStore();
+
+   private long lastUpdateTime;
+
+   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
+   this.retriever = Preconditions.checkNotNull(retriever);
+   this.ctx = Preconditions.checkNotNull(ctx);
+   }
+
+   /**
+* Returns the MetricStore containing all stored metrics.
+*
+* @return MetricStore containing all stored metrics;
+*/
+   public MetricStore getMetricStore() {
+   return metrics;
+   }
+
+   /**
+* This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+*/
+   public void update() {
+   synchronized (this) {
+   long currentTime = System.currentTimeMillis();
+   if (currentTime - lastUpdateTime > 1) { // 10 
seconds have passed since the last update
+   lastUpdateTime = currentTime;
+   fetchMetrics();
+   }
+   }
+   }
+
+   private void fetchMetrics() {
+   try {
+   Option> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+   if (jobManagerGatewayAndWebPort.isDefined()) {
+   ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+
+   /**
+* Remove all metrics that belong to a job that 
is not running and no longer archived.
+*/
+   jobManager.ask(new RequestJobDetails(true, 

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426894#comment-15426894
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355630
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.dispatch.OnSuccess;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+   private final JobManagerRetriever retriever;
+   private final ExecutionContext ctx;
+   private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+
+   private MetricStore metrics = new MetricStore();
+
+   private long lastUpdateTime;
+
+   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
+   this.retriever = Preconditions.checkNotNull(retriever);
+   this.ctx = Preconditions.checkNotNull(ctx);
+   }
+
+   /**
+* Returns the MetricStore containing all stored metrics.
+*
+* @return MetricStore containing all stored metrics;
+*/
+   public MetricStore getMetricStore() {
+   return metrics;
+   }
+
+   /**
+* This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+*/
+   public void update() {
+   synchronized (this) {
+   long currentTime = System.currentTimeMillis();
+   if (currentTime - lastUpdateTime > 1) { // 10 
seconds have passed since the last update
+   lastUpdateTime = currentTime;
+   fetchMetrics();
+   }
+   }
+   }
+
+   private void fetchMetrics() {
+   try {
+   Option> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+   if (jobManagerGatewayAndWebPort.isDefined()) {
+   ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+
+   /**
+* Remove all metrics that belong to a job that 
is not running and no longer archived.
+*/
+   jobManager.ask(new RequestJobDetails(true, 

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355630
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.dispatch.OnSuccess;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+   private final JobManagerRetriever retriever;
+   private final ExecutionContext ctx;
+   private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+
+   private MetricStore metrics = new MetricStore();
+
+   private long lastUpdateTime;
+
+   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
+   this.retriever = Preconditions.checkNotNull(retriever);
+   this.ctx = Preconditions.checkNotNull(ctx);
+   }
+
+   /**
+* Returns the MetricStore containing all stored metrics.
+*
+* @return MetricStore containing all stored metrics;
+*/
+   public MetricStore getMetricStore() {
+   return metrics;
+   }
+
+   /**
+* This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+*/
+   public void update() {
+   synchronized (this) {
+   long currentTime = System.currentTimeMillis();
+   if (currentTime - lastUpdateTime > 1) { // 10 
seconds have passed since the last update
+   lastUpdateTime = currentTime;
+   fetchMetrics();
+   }
+   }
+   }
+
+   private void fetchMetrics() {
+   try {
+   Option> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+   if (jobManagerGatewayAndWebPort.isDefined()) {
+   ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+
+   /**
+* Remove all metrics that belong to a job that 
is not running and no longer archived.
+*/
+   jobManager.ask(new RequestJobDetails(true, 
true), timeout)
+   .onSuccess(new OnSuccess() {
+   @Override
+   public void onSuccess(Object 
result) throws 

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426912#comment-15426912
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356544
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

I'm not sure what you mean, but `JobMetricGroup` inherits from 
`ComponentMetricGroup`.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356544
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

I'm not sure what you mean, but `JobMetricGroup` inherits from 
`ComponentMetricGroup`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426908#comment-15426908
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -54,13 +53,19 @@
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a 
closed group.
  */
-public abstract class AbstractMetricGroup implements MetricGroup {
+public abstract class AbstractMetricGroup 
implements MetricGroup {
--- End diff --

I will address this in #2300


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -54,13 +53,19 @@
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a 
closed group.
  */
-public abstract class AbstractMetricGroup implements MetricGroup {
+public abstract class AbstractMetricGroup 
implements MetricGroup {
--- End diff --

I will address this in #2300


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint

2016-08-18 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-3397.
--
Resolution: Fixed
  Assignee: (was: ramkrishna.s.vasudevan)

Fixed as part of `FLINK-4322`.

> Failed streaming jobs should fall back to the most recent checkpoint/savepoint
> --
>
> Key: FLINK-3397
> URL: https://issues.apache.org/jira/browse/FLINK-3397
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.0.0
>Reporter: Gyula Fora
>Priority: Minor
> Attachments: FLINK-3397.pdf
>
>
> The current fallback behaviour in case of a streaming job failure is slightly 
> counterintuitive:
> If a job fails it will fall back to the most recent checkpoint (if any) even 
> if there were more recent savepoint taken. This means that savepoints are not 
> regarded as checkpoints by the system only points from where a job can be 
> manually restarted.
> I suggest to change this so that savepoints are also regarded as checkpoints 
> in case of a failure and they will also be used to automatically restore the 
> streaming job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356190
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
--- End diff --

agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426906#comment-15426906
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75356190
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
--- End diff --

agreed.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426899#comment-15426899
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355826
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   "{\"id\":\"subindex.opname.abc.metric5\"}," +
+   "{\"id\":\"subindex.abc.metric4\"}" +
+   "]",
+   availableList);
+
+   queryParams.put("get", "subindex.opname.abc.metric5");
+
+   String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}" +
+   "]"
+   , metricValue
+   );
+
+   queryParams.put("get", 
"subindex.opname.abc.metric5,subindex.abc.metric4");
+
+   String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}," +
+   
"{\"id\":\"subindex.abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   metricValues
+   );
+   }
+
+   @Test
+   public void testInvalidListDoesNotFail() {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
  

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355826
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   "{\"id\":\"subindex.opname.abc.metric5\"}," +
+   "{\"id\":\"subindex.abc.metric4\"}" +
+   "]",
+   availableList);
+
+   queryParams.put("get", "subindex.opname.abc.metric5");
+
+   String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}" +
+   "]"
+   , metricValue
+   );
+
+   queryParams.put("get", 
"subindex.opname.abc.metric5,subindex.abc.metric4");
+
+   String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}," +
+   
"{\"id\":\"subindex.abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   metricValues
+   );
+   }
+
+   @Test
+   public void testInvalidListDoesNotFail() {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   //-invalid variable
+   pathParams.put("jobid", "nonexistant");
+
+   try {
+   handler.handleRequest(pathParams, queryParams, null);
--- End diff --


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75355525
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.dispatch.OnSuccess;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and 
all registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+   private final JobManagerRetriever retriever;
+   private final ExecutionContext ctx;
+   private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+
+   private MetricStore metrics = new MetricStore();
+
+   private long lastUpdateTime;
+
+   public MetricFetcher(JobManagerRetriever retriever, ExecutionContext 
ctx) {
+   this.retriever = Preconditions.checkNotNull(retriever);
+   this.ctx = Preconditions.checkNotNull(ctx);
+   }
+
+   /**
+* Returns the MetricStore containing all stored metrics.
+*
+* @return MetricStore containing all stored metrics;
+*/
+   public MetricStore getMetricStore() {
+   return metrics;
+   }
+
+   /**
+* This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+*/
+   public void update() {
+   synchronized (this) {
+   long currentTime = System.currentTimeMillis();
+   if (currentTime - lastUpdateTime > 1) { // 10 
seconds have passed since the last update
+   lastUpdateTime = currentTime;
+   fetchMetrics();
+   }
+   }
+   }
+
+   private void fetchMetrics() {
+   try {
+   Option> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+   if (jobManagerGatewayAndWebPort.isDefined()) {
+   ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+
+   /**
+* Remove all metrics that belong to a job that 
is not running and no longer archived.
+*/
+   jobManager.ask(new RequestJobDetails(true, 
true), timeout)
+   .onSuccess(new OnSuccess() {
+   @Override
+   public void onSuccess(Object 
result) throws 

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426885#comment-15426885
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354845
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -47,4 +47,21 @@ object Messages {
* @return The Acknowledge case object instance.
*/
   def getAcknowledge(): Acknowledge.type = Acknowledge
+
+  /**
+* Signals that the receiver (JobManager/TaskManager) should transmit a 
dump of the
+* registered metrics.
+* 
+* This message may be send regularly by the WebInterface.
+*/
+  case object MetricRequest
+
+  /**
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The MetricRequest case object instance.
+*/
+  def getRequestMetrics(): AnyRef = {
--- End diff --

sure


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426884#comment-15426884
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
+   toRemove.add((Metric) message);
+   } else if (message instanceof ActorRef) { // create dump
+   Object[] dump = createDump();
+   

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354845
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -47,4 +47,21 @@ object Messages {
* @return The Acknowledge case object instance.
*/
   def getAcknowledge(): Acknowledge.type = Acknowledge
+
+  /**
+* Signals that the receiver (JobManager/TaskManager) should transmit a 
dump of the
+* registered metrics.
+* 
+* This message may be send regularly by the WebInterface.
+*/
+  case object MetricRequest
+
+  /**
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The MetricRequest case object instance.
+*/
+  def getRequestMetrics(): AnyRef = {
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
+   toRemove.add((Metric) message);
+   } else if (message instanceof ActorRef) { // create dump
+   Object[] dump = createDump();
+   ((ActorRef) message).tell(dump, getSelf());
+   removeMetrics();
+   } else {
+   LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
   

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426877#comment-15426877
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -61,9 +58,7 @@ public TaskManagerJobMetricGroup(
JobID jobId,
@Nullable String jobName) {
 
-   super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-   this.parent = checkNotNull(parent);
+   super(registry, checkNotNull(parent), jobId, jobName, 
scopeFormat.formatScope(parent, jobId, jobName));
--- End diff --

`parent` is also used in `scopeFormat.formatScope`, as such i wanted the 
check before that call is made.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75354216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -61,9 +58,7 @@ public TaskManagerJobMetricGroup(
JobID jobId,
@Nullable String jobName) {
 
-   super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-   this.parent = checkNotNull(parent);
+   super(registry, checkNotNull(parent), jobId, jobName, 
scopeFormat.formatScope(parent, jobId, jobName));
--- End diff --

`parent` is also used in `scopeFormat.formatScope`, as such i wanted the 
check before that call is made.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426870#comment-15426870
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75353675
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   try {
+   String[] components = name.split(":");
--- End diff --

yes, the MetricQueryService does that


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75353675
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   try {
+   String[] components = name.split(":");
--- End diff --

yes, the MetricQueryService does that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426867#comment-15426867
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75353538
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
--- End diff --

yes


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75353538
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
--- End diff --

yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4317) Restructure documentation

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426862#comment-15426862
 ] 

ASF GitHub Bot commented on FLINK-4317:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2387

[FLINK-4317, FLIP-3] [docs] Restructure docs

This is the initial layout for the docs restructuring as discussed in 
FLIP-3.

We move the top level navigation to the left side and allow arbitrary 
nestings to be defined. The main goal is that users find more of the important 
content. Therefore the docs are now treating streaming as the default and move 
some streaming-only content to the top-level.

This kind of left-side navigation layout is also followed by other popular 
projects like [Google DataFlow docs](https://cloud.google.com/dataflow/docs/) 
or http://readthedocs.org.

You can preview the new layout here: http://uce.github.io/flink/

We need to follow up with more actual content (see FLIP-3). If we adapt 
this layout, I would file the required content changes as JIRAs and ask someone 
to coordinate the writing process among the interested contributors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2387


commit 724296b0376291e7f4a0e4031507f871f56395fa
Author: Ufuk Celebi 
Date:   2016-08-17T13:06:04Z

[FLINK-4317, FLIP-3] [docs] Restructure docs

- Add redirect layout
- Remove Maven artifact name warning
- Add info box if stable, but not latest
- Add font-awesome 4.6.3
- Add sidenav layout




> Restructure documentation
> -
>
> Key: FLINK-4317
> URL: https://issues.apache.org/jira/browse/FLINK-4317
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> - Documentation is not well grouped into categories according to what is 
> relevant in different steps of the development and deployment process
> - The current layout has not enough top-level navigation menus to guide 
> people that look for docs. Experience shows that anything hidden in a larger 
> document linked from a sub-menu is overlooked by many people.
> Concepts
> - Overview: Similar to the current start page, stack and brief description
> - Concepts: Similar to what we currently have
> - Architecture: Process model (JM / TM / Client / …)
> - Project Structure: Core Maven artifacts, what needed when, relationships
> Semantics
> Setup & Operations
> - Setup: Downloading, Building from Source, Hadoop Versions / Scala Versions
> - Deployment Options: Yarn, Mesos, Standalone, etc
> - Configuration
> - Failure & Recovery Model
> - Security Model
> Application Development
> - Quickstarts 
> - APIs: Streaming, Batch, Table, SQL
> - Event Time and Windowing Semantics
> - Libraries
> - State Backends
> - Connectors, End-to-end Exactly-once
> - Data Types: Types, Serialization (Custom, Kryo, Avro), Lambdas, Hints, 
> Extensibility
> Testing & Debugging
> - Testing and test utilities
> - Debugging
> - Monitoring: Web Frontend, Metrics
> - Tuning: Memory, CPU, GC
> - Operations: Version Upgrades, etc
> Internals
> - how does it work



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2387: [FLINK-4317, FLIP-3] [docs] Restructure docs

2016-08-18 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2387

[FLINK-4317, FLIP-3] [docs] Restructure docs

This is the initial layout for the docs restructuring as discussed in 
FLIP-3.

We move the top level navigation to the left side and allow arbitrary 
nestings to be defined. The main goal is that users find more of the important 
content. Therefore the docs are now treating streaming as the default and move 
some streaming-only content to the top-level.

This kind of left-side navigation layout is also followed by other popular 
projects like [Google DataFlow docs](https://cloud.google.com/dataflow/docs/) 
or http://readthedocs.org.

You can preview the new layout here: http://uce.github.io/flink/

We need to follow up with more actual content (see FLIP-3). If we adapt 
this layout, I would file the required content changes as JIRAs and ask someone 
to coordinate the writing process among the interested contributors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2387.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2387


commit 724296b0376291e7f4a0e4031507f871f56395fa
Author: Ufuk Celebi 
Date:   2016-08-17T13:06:04Z

[FLINK-4317, FLIP-3] [docs] Restructure docs

- Add redirect layout
- Remove Maven artifact name warning
- Add info box if stable, but not latest
- Add font-awesome 4.6.3
- Add sidenav layout




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426838#comment-15426838
 ] 

ASF GitHub Bot commented on FLINK-4282:
---

Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2355
  
I have rearranged the code. It is really hard for me to add space after 
every comma,do you have any code style preset for IDEs so that I can reformat 
my new code automatically without change those of others?


> Add Offset Parameter to WindowAssigners
> ---
>
> Key: FLINK-4282
> URL: https://issues.apache.org/jira/browse/FLINK-4282
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Currently, windows are always aligned to EPOCH, which basically means days 
> are aligned with GMT. This is somewhat problematic for people living in 
> different timezones.
> And offset parameter would allow to adapt the window assigner to the timezone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2355: [FLINK-4282]Add Offset Parameter to WindowAssigners

2016-08-18 Thread Renkai
Github user Renkai commented on the issue:

https://github.com/apache/flink/pull/2355
  
I have rearranged the code. It is really hard for me to add space after 
every comma,do you have any code style preset for IDEs so that I can reformat 
my new code automatically without change those of others?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426830#comment-15426830
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2363
  
Thanks for your contribution @zentol. I've gone over the code and made some 
inline comments. My main concern/question is actually the representation of 
metric's type and hierarchy information. I think that encoding it in a string 
and then re-parsing it on the receiver side to reconstruct the information is 
rather fragile and error-prone especially wrt maintainability. Maybe you can 
give me some background why you decided to do it so.

Apart from that, I think the code contains many tests, which I really like 
:-)


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2363
  
Thanks for your contribution @zentol. I've gone over the code and made some 
inline comments. My main concern/question is actually the representation of 
metric's type and hierarchy information. I think that encoding it in a string 
and then re-parsing it on the receiver side to reconstruct the information is 
rather fragile and error-prone especially wrt maintainability. Maybe you can 
give me some background why you decided to do it so.

Apart from that, I think the code contains many tests, which I really like 
:-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-08-18 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2386

[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of 
records going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an 
configured interval. The default value for the interval is 2000 ms. The 
emission of markers can be disabled by setting the interval to 0. 
LatencyMarkers can not "overtake" regular elements. This ensures that the 
measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward 
latency markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. 
This ensures that every marker exists only once in the system, and that 
repartition steps are not causing an explosion in the number of transferred 
markers.
If an operator is a sink, it will maintain the last 512 latencies from each 
known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a 
special LatencyGauge from the sink (every operator can be a sink, if it doesn't 
have any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are 
in-sync, so the latency measurements will be inaccurate if the hardware clocks 
are not correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3660-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2386






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426825#comment-15426825
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2386

[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of 
records going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an 
configured interval. The default value for the interval is 2000 ms. The 
emission of markers can be disabled by setting the interval to 0. 
LatencyMarkers can not "overtake" regular elements. This ensures that the 
measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward 
latency markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. 
This ensures that every marker exists only once in the system, and that 
repartition steps are not causing an explosion in the number of transferred 
markers.
If an operator is a sink, it will maintain the last 512 latencies from each 
known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a 
special LatencyGauge from the sink (every operator can be a sink, if it doesn't 
have any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are 
in-sync, so the latency measurements will be inaccurate if the hardware clocks 
are not correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink3660-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2386






> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426822#comment-15426822
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75347741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
+   toRemove.add((Metric) message);
+   } else if (message instanceof ActorRef) { // create dump
+   Object[] dump = createDump();
+   

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75347741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
+   toRemove.add((Metric) message);
+   } else if (message instanceof ActorRef) { // create dump
+   Object[] dump = createDump();
+   ((ActorRef) message).tell(dump, getSelf());
+   removeMetrics();
+   } else {
+   LOG.warn("MetricQueryServiceActor received an 
invalid message. " + 

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426813#comment-15426813
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346843
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1024,6 +1024,17 @@ class JobManager(
 
 case RequestWebMonitorPort =>
   sender() ! ResponseWebMonitorPort(webMonitorPort)
+
+case MetricRequest =>
+  metricsRegistry match {
+case Some(registry) =>
+  registry.getQueryService match {
+case Some(queryService) =>
+  queryService ! sender()
+case None =>
+  }
+case None =>
--- End diff --

In both `None` cases you should return a failure message to the sender so 
that it does not have to wait until the future times out.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346843
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1024,6 +1024,17 @@ class JobManager(
 
 case RequestWebMonitorPort =>
   sender() ! ResponseWebMonitorPort(webMonitorPort)
+
+case MetricRequest =>
+  metricsRegistry match {
+case Some(registry) =>
+  registry.getQueryService match {
+case Some(queryService) =>
+  queryService ! sender()
+case None =>
+  }
+case None =>
--- End diff --

In both `None` cases you should return a failure message to the sender so 
that it does not have to wait until the future times out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346742
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -47,4 +47,21 @@ object Messages {
* @return The Acknowledge case object instance.
*/
   def getAcknowledge(): Acknowledge.type = Acknowledge
+
+  /**
+* Signals that the receiver (JobManager/TaskManager) should transmit a 
dump of the
+* registered metrics.
+* 
+* This message may be send regularly by the WebInterface.
+*/
+  case object MetricRequest
+
+  /**
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The MetricRequest case object instance.
+*/
+  def getRequestMetrics(): AnyRef = {
--- End diff --

Can we name this method like the message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426812#comment-15426812
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346742
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala 
---
@@ -47,4 +47,21 @@ object Messages {
* @return The Acknowledge case object instance.
*/
   def getAcknowledge(): Acknowledge.type = Acknowledge
+
+  /**
+* Signals that the receiver (JobManager/TaskManager) should transmit a 
dump of the
+* registered metrics.
+* 
+* This message may be send regularly by the WebInterface.
+*/
+  case object MetricRequest
+
+  /**
+* Accessor for the case object instance, to simplify Java 
interoperability.
+*
+* @return The MetricRequest case object instance.
+*/
+  def getRequestMetrics(): AnyRef = {
--- End diff --

Can we name this method like the message?


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426808#comment-15426808
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346431
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
--- End diff --

To be honest, I'm not a very big fan of encoding type and hierarchical 
information in a string which has to be reparsed in order to reconstruct the 
afore-mentioned information. The problem with this approach is that everything 
is very implicit and you don't have a tight coupling (in terms of format) 
between the sender and receiver. If something changes at the sender side, you 
won't notice at all that you have to change here something as well. Even at 
runtime the only thing you see is that you don't see the metrics. This makes it 
very hard to debug. I would be in favour of creating for the different types 
different messages, e.g. `JobManagerMetric`, `TaskManagerMetric`, `TaskMetric`, 
etc. These messages contain the respective information. Furthermore, for the 
histogram type case, you will have to do the whole parsing over and over again 
instead of doing it once.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75346431
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
--- End diff --

To be honest, I'm not a very big fan of encoding type and hierarchical 
information in a string which has to be reparsed in order to reconstruct the 
afore-mentioned information. The problem with this approach is that everything 
is very implicit and you don't have a tight coupling (in terms of format) 
between the sender and receiver. If something changes at the sender side, you 
won't notice at all that you have to change here something as well. Even at 
runtime the only thing you see is that you don't see the metrics. This makes it 
very hard to debug. I would be in favour of creating for the different types 
different messages, e.g. `JobManagerMetric`, `TaskManagerMetric`, `TaskMetric`, 
etc. These messages contain the respective information. Furthermore, for the 
histogram type case, you will have to do the whole parsing over and over again 
instead of doing it once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426787#comment-15426787
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -61,9 +58,7 @@ public TaskManagerJobMetricGroup(
JobID jobId,
@Nullable String jobName) {
 
-   super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-   this.parent = checkNotNull(parent);
+   super(registry, checkNotNull(parent), jobId, jobName, 
scopeFormat.formatScope(parent, jobId, jobName));
--- End diff --

shouldn't the `checkNotNull` be performed by the super constructor?


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426786#comment-15426786
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

Is this an internal component like the `JobMetricGroup`?


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
 ---
@@ -61,9 +58,7 @@ public TaskManagerJobMetricGroup(
JobID jobId,
@Nullable String jobName) {
 
-   super(registry, jobId, jobName, scopeFormat.formatScope(parent, 
jobId, jobName));
-
-   this.parent = checkNotNull(parent);
+   super(registry, checkNotNull(parent), jobId, jobName, 
scopeFormat.formatScope(parent, jobId, jobName));
--- End diff --

shouldn't the `checkNotNull` be performed by the super constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

Is this an internal component like the `JobMetricGroup`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426784#comment-15426784
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

Type parameter in the description is missing.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75344502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -33,19 +36,42 @@
  * group could for example include the task attempt number (more fine 
grained identification), or
  * exclude it (for continuity of the namespace across failure and 
recovery).
  */
-public abstract class ComponentMetricGroup extends AbstractMetricGroup {
+public abstract class ComponentMetricGroup 
extends AbstractMetricGroup {
--- End diff --

Type parameter in the description is missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426783#comment-15426783
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r7534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -54,13 +53,19 @@
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a 
closed group.
  */
-public abstract class AbstractMetricGroup implements MetricGroup {
+public abstract class AbstractMetricGroup 
implements MetricGroup {
--- End diff --

Type parameter description in the JavaDocs is missing


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r7534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 ---
@@ -54,13 +53,19 @@
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a 
closed group.
  */
-public abstract class AbstractMetricGroup implements MetricGroup {
+public abstract class AbstractMetricGroup 
implements MetricGroup {
--- End diff --

Type parameter description in the JavaDocs is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426776#comment-15426776
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -207,6 +239,9 @@ public void register(Metric metric, String metricName, 
MetricGroup group) {
}
}
}
+   if (queryService != null) {
+   notifyOfAddedMetric(queryService, metric, 
metricName, (AbstractMetricGroup) group);
--- End diff --

I think it's always good practice to add the class whose static method on 
calls in front of the static method. Otherwise you easily think that this 
method is somewhere defined in this class. Consequently, I would discourage 
static imports a little bit.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -207,6 +239,9 @@ public void register(Metric metric, String metricName, 
MetricGroup group) {
}
}
}
+   if (queryService != null) {
+   notifyOfAddedMetric(queryService, metric, 
metricName, (AbstractMetricGroup) group);
--- End diff --

I think it's always good practice to add the class whose static method on 
calls in front of the static method. Otherwise you easily think that this 
method is somewhere defined in this class. Consequently, I would discourage 
static imports a little bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426774#comment-15426774
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343353
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -144,6 +152,30 @@ public MetricRegistry(Configuration config) {
}
}
 
+   /**
+* Initializes the MetricQueryService.
+* 
+* @param actorSystem ActorSystem to create the MetricQueryService on
+ */
+   public void startQueryService(ActorSystem actorSystem) {
+   try {
+   queryService = 
MetricQueryService.startMetricQueryService(actorSystem);
+   } catch (Exception e) {
+   LOG.warn("Could not start MetricDumpActor. No metrics 
will be submitted to the WebInterface.", e);
+   }
+   }
+
+   /**
+* Returns an ActorRef to the MetricQueryService
+* 
+* @return ActorRef to the MetricQueryService
+ */
+   public Option getQueryService() {
+   return queryService == null
--- End diff --

this is equivalent to `Option.apply(queryService)`. If queryService == 
null, then the apply call will return a `None`.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343353
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -144,6 +152,30 @@ public MetricRegistry(Configuration config) {
}
}
 
+   /**
+* Initializes the MetricQueryService.
+* 
+* @param actorSystem ActorSystem to create the MetricQueryService on
+ */
+   public void startQueryService(ActorSystem actorSystem) {
+   try {
+   queryService = 
MetricQueryService.startMetricQueryService(actorSystem);
+   } catch (Exception e) {
+   LOG.warn("Could not start MetricDumpActor. No metrics 
will be submitted to the WebInterface.", e);
+   }
+   }
+
+   /**
+* Returns an ActorRef to the MetricQueryService
+* 
+* @return ActorRef to the MetricQueryService
+ */
+   public Option getQueryService() {
+   return queryService == null
--- End diff --

this is equivalent to `Option.apply(queryService)`. If queryService == 
null, then the apply call will return a `None`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426770#comment-15426770
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
--- End diff --

The same applies to the remove metric message and the create dump. Both 
should get a proper message, e.g. `RemoveMetric` and `CreateDump`.


> Expose metrics to Webfrontend
> -
>
>  

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75343063
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
+   Tuple3 
tuple = (Tuple3) message;
+
+   String metricName = tuple.f0;
+   Metric metric = tuple.f1;
+   AbstractMetricGroup group = tuple.f2;
+
+   String name = 
group.getQueryServiceMetricIdentifier(metricName, new CharacterFilter() {
+   @Override
+   public String filterCharacters(String 
input) {
+   return input.replaceAll("[ 
:.,]", "_");
+   }
+   });
+
+   if (metric instanceof Counter) {
+   counters.put((Counter) metric, name);
+   } else if (metric instanceof Gauge) {
+   gauges.put((Gauge) metric, name);
+   } else if (metric instanceof Histogram) {
+   histograms.put((Histogram) metric, 
name);
+   }
+   } else if (message instanceof Metric) { // remove metric
--- End diff --

The same applies to the remove metric message and the create dump. Both 
should get a proper message, e.g. `RemoveMetric` and `CreateDump`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426764#comment-15426764
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75342927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
--- End diff --

I think we should create proper message types instead of using a `Tuple3` 
here.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75342927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricQueryService.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The MetricQueryService creates a key-value representation of all 
metrics currently registered with Flink when queried.
+ *
+ * It is realized as an actor and receives the following messages:
+ * - {@code Tuple3 => Notification of 
added Metric}
+ * - {@code Metric => Notification of removed Metric}
+ * - {@code ActorRef => Query for metric dump}
+ */
+public class MetricQueryService extends UntypedActor {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+   public static final byte CATEGORY_COUNTER = 0;
+   public static final byte CATEGORY_GAUGE = 1;
+   public static final byte CATEGORY_HISTOGRAM = 2;
+
+   private final Map gauges = new HashMap<>();
+   private final Map counters = new HashMap<>();
+   private final Map histograms = new HashMap<>();
+
+   private final List toRemove = new ArrayList<>();
+
+   @Override
+   public void onReceive(Object message) throws Exception {
+   try {
+   if (message instanceof Tuple3) { // add metric
--- End diff --

I think we should create proper message types instead of using a `Tuple3` 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340401
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
--- End diff --

`extends TestLogger` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426740#comment-15426740
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340930
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
--- End diff --

Should be added to all tests because this makes debugging with Travis logs 
considerably easier.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340930
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
--- End diff --

Should be added to all tests because this makes debugging with Travis logs 
considerably easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3866) StringArraySerializer claims type is immutable; shouldn't

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426739#comment-15426739
 ] 

ASF GitHub Bot commented on FLINK-3866:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2289
  
Thanks for the fix @mushketyk!

Looks good to merge


> StringArraySerializer claims type is immutable; shouldn't
> -
>
> Key: FLINK-3866
> URL: https://issues.apache.org/jira/browse/FLINK-3866
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Looking at default `TypeSerializer` instances I noticed what looks like a 
> minor flaw, unless I am missing something.
> Whereas all other array serializers indicate that type is not immutable 
> (since in Java, arrays are not immutable), `StringArraySerializer` has:
> ```
>   @Override
>   public boolean isImmutableType() {
>   return true;
>   }
> ```
> and I think it should instead return `false`. I could create a PR, but seems 
> like a small enough thing that issue report makes more sense.
> I tried looking for deps to see if there's a test for this, but couldn't find 
> one; otherwise could submit a test fix.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426738#comment-15426738
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340742
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   "{\"id\":\"subindex.opname.abc.metric5\"}," +
+   "{\"id\":\"subindex.abc.metric4\"}" +
+   "]",
+   availableList);
+
+   queryParams.put("get", "subindex.opname.abc.metric5");
+
+   String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}" +
+   "]"
+   , metricValue
+   );
+
+   queryParams.put("get", 
"subindex.opname.abc.metric5,subindex.abc.metric4");
+
+   String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}," +
+   
"{\"id\":\"subindex.abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   metricValues
+   );
+   }
+
+   @Test
+   public void testInvalidListDoesNotFail() {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", 

[GitHub] flink issue #2289: [FLINK-3866] StringArraySerializer type should be mutable

2016-08-18 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2289
  
Thanks for the fix @mushketyk!

Looks good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340742
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   "{\"id\":\"subindex.opname.abc.metric5\"}," +
+   "{\"id\":\"subindex.abc.metric4\"}" +
+   "]",
+   availableList);
+
+   queryParams.put("get", "subindex.opname.abc.metric5");
+
+   String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}" +
+   "]"
+   , metricValue
+   );
+
+   queryParams.put("get", 
"subindex.opname.abc.metric5,subindex.abc.metric4");
+
+   String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+
+   assertEquals("[" +
+   
"{\"id\":\"subindex.opname.abc.metric5\",\"value\":\"4\"}," +
+   
"{\"id\":\"subindex.abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   metricValues
+   );
+   }
+
+   @Test
+   public void testInvalidListDoesNotFail() {
+   MetricFetcher fetcher = new 
MetricFetcher(mock(JobManagerRetriever.class), mock(ExecutionContext.class));
+   MetricStore store = fetcher.getMetricStore();
+
+   JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+   store.add("0:abc.metric1", 0);
+   store.add("1:tmid:abc.metric2", 1);
+   store.add("2:jobid:abc.metric3", 2);
+   store.add("3:jobid:taskid:subindex:abc.metric4", 3);
+   store.add("4:jobid:taskid:subindex:opname:abc.metric5", 4);
+
+   Map pathParams = new HashMap<>();
+   Map queryParams = new HashMap<>();
+
+   pathParams.put("jobid", "jobid");
+   pathParams.put("vertexid", "taskid");
+
+   //-invalid variable
+   pathParams.put("jobid", "nonexistant");
+
+   try {
+   handler.handleRequest(pathParams, queryParams, null);
--- End diff --

  

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426732#comment-15426732
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340401
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
--- End diff --

`extends TestLogger` missing


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426734#comment-15426734
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340445
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
--- End diff --

A short description would be helpful


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75340445
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricHandlerTest.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricHandlerTest {
+   @Test
+   public void testHandleRequest() throws Exception {
--- End diff --

A short description would be helpful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4414) Remove restriction on RpcService.getAddress

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426728#comment-15426728
 ] 

ASF GitHub Bot commented on FLINK-4414:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2381
  
Coming back to the initial motivation of this PR,  It is because that I 
tried to get the address from the RpcService, and the RpcService provides a api 
`getAddress(RpcGateway gateway)`  but can noly provide the address of endpoint 
created on it only. 
Another solution just for discuss:
1. change function prototype to `getAddress(RpcEndpoint endpoint)` 
2. the AkkaRpcService can adapt this change easily, since we can get 
actorRef from RpcEndpoint, and do what is done now. Besides, AkkaRpcService 
need to check whether the actorRef is in the cached actors and raise a 
exception if not.
In this way, we cannot get address from gateway, but it also make sense 
since the endpoint is server , which must have a address to visit, and the 
gateway is a client, providing an address is an optional.


> Remove restriction on RpcService.getAddress
> ---
>
> Key: FLINK-4414
> URL: https://issues.apache.org/jira/browse/FLINK-4414
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>
> currently {{RpcService}} provide only address of the endpoint, I think rpc 
> service serve both the endpoint create on it and the remote gateway create on 
> it, so it is ok to offer the getAddress to all {{RpcGateway}} created on the 
> rpc service including the server and client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2381: [FLINK-4414][cluster management] Remove restriction on Rp...

2016-08-18 Thread wenlong88
Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/2381
  
Coming back to the initial motivation of this PR,  It is because that I 
tried to get the address from the RpcService, and the RpcService provides a api 
`getAddress(RpcGateway gateway)`  but can noly provide the address of endpoint 
created on it only. 
Another solution just for discuss:
1. change function prototype to `getAddress(RpcEndpoint endpoint)` 
2. the AkkaRpcService can adapt this change easily, since we can get 
actorRef from RpcEndpoint, and do what is done now. Besides, AkkaRpcService 
need to check whether the actorRef is in the cached actors and raise a 
exception if not.
In this way, we cannot get address from gateway, but it also make sense 
since the endpoint is server , which must have a address to visit, and the 
gateway is a client, providing an address is an optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4231) Switch DistinctOperator from GroupReduce to Reduce

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426725#comment-15426725
 ] 

ASF GitHub Bot commented on FLINK-4231:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2272
  
Thanks for the PR @greghogan. I added a few comments. 
Looks good to merge otherwise. 

Thanks, Fabian


> Switch DistinctOperator from GroupReduce to Reduce
> --
>
> Key: FLINK-4231
> URL: https://issues.apache.org/jira/browse/FLINK-4231
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> As discussed in FLINK-3279, rewriting {{DistinctOperator}} to a 
> {{ReduceFunction}} rather than the current {{GroupReduceFunction}} allows the 
> user to set the {{CombineHint}} and choose a hash-based, sort-based, or no 
> combiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2272: [FLINK-4231] [java] Switch DistinctOperator from GroupRed...

2016-08-18 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2272
  
Thanks for the PR @greghogan. I added a few comments. 
Looks good to merge otherwise. 

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426720#comment-15426720
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75339017
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   try {
+   String[] components = name.split(":");
--- End diff --

Do we filter `:` out in user defined metric names?


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75339017
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
+
+   /**
+* Adds a metric to this MetricStore.
+*
+* @param name  the metric identifier
+* @param value the metric value
+*/
+   public void add(String name, Object value) {
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   try {
+   String[] components = name.split(":");
--- End diff --

Do we filter `:` out in user defined metric names?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426718#comment-15426718
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r75338668
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+   JobManagerMetricStore jobManager = new JobManagerMetricStore();
+   Map taskManagers = new HashMap<>();
+   Map jobs = new HashMap<>();
--- End diff --

Can these fields be final?


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426717#comment-15426717
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2342
  
@uce 
Got a green build. Any feedback/reviews here. I know you guys are busy, 
just a gentle reminder.


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2342: FLINK-4253 - Rename "recovery.mode" config key to "high-a...

2016-08-18 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2342
  
@uce 
Got a green build. Any feedback/reviews here. I know you guys are busy, 
just a gentle reminder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >