[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438223#comment-16438223
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r181540905
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

How about this way: setup the cluster with one slot per TM, and use the 
`host:pid` to act as the "`allocationID`"?


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-04-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r181540905
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

How about this way: setup the cluster with one slot per TM, and use the 
`host:pid` to act as the "`allocationID`"?


---


[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Lau updated FLINK-9173:
---
Description: 
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 In the development environment,such as, Eclipse Luna.

The job of the application can be submitted to the standalone cluster, via 
Spring boot Application main method.

But mvn spring-boot:run will print this exception.

Local operation system is Mac OSX , the jdk version is 1.8.0_151.

 

  was:
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 

[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Lau updated FLINK-9173:
---
Description: 
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 In the development environment,such as, Eclipse Luna.

The job of the application can be submitted to the standalone cluster, via 
Spring boot Application main method.

But mvn spring-boot:run will pring this exception.

Local operation system is Mac OSX , the jdk version is 1.8.0_151.

 

  was:
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 

[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Lau updated FLINK-9173:
---
Description: 
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 In the development environment,such as, Eclipse Luna.

The job of the application can be submitted to the standalone cluster, via 
Spring boot Application main method.

Bug mvn spring-boot:run will pring this exception.

Local operation system is Mac OSX , the jdk version is 1.8.0_151.

 

  was:
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 

[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Lau updated FLINK-9173:
---
Component/s: flink-shaded.git

> RestClient - Received response is abnormal
> --
>
> Key: FLINK-9173
> URL: https://issues.apache.org/jira/browse/FLINK-9173
> Project: Flink
>  Issue Type: Bug
>  Components: flink-shaded.git, Job-Submission, REST
>Affects Versions: 1.5.0
> Environment: OS:    CentOS 6.8
> JAVA:    1.8.0_161-b12
> maven-plugin:     spring-boot-maven-plugin
> Spring-boot:      1.5.10.RELEASE
>Reporter: Bob Lau
>Priority: Major
>
> The system prints the exception log as follows:
>  
> {code:java}
> //代码占位符
> 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
> o.a.flink.runtime.rest.RestClient - Received response was neither of the 
> expected type ([simple type, class 
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) 
> nor an error. 
> Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
>  Unrecognized field "status" (class 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
> ignorable (one known property: "errors"])
> at [Source: N/A; line: -1, column: -1] (through reference chain: 
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
> at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
> at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
> at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
> at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  In the development environment,such as, Eclipse Luna.
> The job of the application can be submitted to the standalone cluster.
> Local operation system is Mac OSX , the jdk version is 1.8.0_151.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Lau updated FLINK-9173:
---
Description: 
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 In the development environment,such as, Eclipse Luna.

The job of the application can be submitted to the standalone cluster.

Local operation system is Mac OSX , the jdk version is 1.8.0_151.

 

  was:
The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 

[jira] [Created] (FLINK-9173) RestClient - Received response is abnormal

2018-04-13 Thread Bob Lau (JIRA)
Bob Lau created FLINK-9173:
--

 Summary: RestClient - Received response is abnormal
 Key: FLINK-9173
 URL: https://issues.apache.org/jira/browse/FLINK-9173
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission, REST
Affects Versions: 1.5.0
 Environment: OS:    CentOS 6.8

JAVA:    1.8.0_161-b12

maven-plugin:     spring-boot-maven-plugin

Spring-boot:      1.5.10.RELEASE
Reporter: Bob Lau


The system prints the exception log as follows:

 
{code:java}
//代码占位符
09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR 
o.a.flink.runtime.rest.RestClient - Received response was neither of the 
expected type ([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor 
an error. 
Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
 Unrecognized field "status" (class 
org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as 
ignorable (one known property: "errors"])
at [Source: N/A; line: -1, column: -1] (through reference chain: 
org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050)
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547)
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9157) Support Apache HCatalog in SQL client

2018-04-13 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438151#comment-16438151
 ] 

Rong Rong commented on FLINK-9157:
--

Hi [~dmitry_kober], yes, that would be great! I think for now feel free to take 
a look at the related tasks. it is currently blocked by FLINK-9170. Once the 
integration with Table/SQL API is done. we can probably start with the 
integration. 

> Support Apache HCatalog in SQL client
> -
>
> Key: FLINK-9157
> URL: https://issues.apache.org/jira/browse/FLINK-9157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Having SQL-Client to support first external catalog: Apache HCatalog out of 
> the box.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9172) Support external catalog factory that comes default with SQL-Client

2018-04-13 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9172:
-
Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-7594

> Support external catalog factory that comes default with SQL-Client
> ---
>
> Key: FLINK-9172
> URL: https://issues.apache.org/jira/browse/FLINK-9172
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Priority: Major
>
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. I am currently 
> think of having an external catalog factory that spins up both streaming and 
> batch external catalog table sources and sinks. This could greatly unify and 
> provide easy access for SQL users. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9172) Support external catalog factory that comes default with SQL-Client

2018-04-13 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9172:


 Summary: Support external catalog factory that comes default with 
SQL-Client
 Key: FLINK-9172
 URL: https://issues.apache.org/jira/browse/FLINK-9172
 Project: Flink
  Issue Type: New Feature
Reporter: Rong Rong


It will be great to have SQL-Client to support some external catalogs 
out-of-the-box for SQL users to configure and utilize easily. I am currently 
think of having an external catalog factory that spins up both streaming and 
batch external catalog table sources and sinks. This could greatly unify and 
provide easy access for SQL users. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9157) Support Apache HCatalog in SQL client

2018-04-13 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9157:
-
Description: Having SQL-Client to support first external catalog: Apache 
HCatalog out of the box.  (was: It will be great to have SQL-Client to support 
some external catalogs out-of-the-box for SQL users to configure and utilize 
easily. Such as Apache HCatalog. )

> Support Apache HCatalog in SQL client
> -
>
> Key: FLINK-9157
> URL: https://issues.apache.org/jira/browse/FLINK-9157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Having SQL-Client to support first external catalog: Apache HCatalog out of 
> the box.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438133#comment-16438133
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534458
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534458
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+  

[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438130#comment-16438130
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534395
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

ah yes, I copied it at the start but never got around to deleting the old 
one..


> Port AbstractAggregatingMetricsHandler to RestServerEndpoint
> 
>
> Key: FLINK-8370
> URL: https://issues.apache.org/jira/browse/FLINK-8370
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port subclasses of 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}}
>  to new FLIP-6 {{RestServerEndpoint}}.
> The following handlers need to be migrated:
> * {{AggregatingJobsMetricsHandler}}
> * {{AggregatingSubtasksMetricsHandler}}
> * {{AggregatingTaskManagersMetricsHandler}}
> New handlers should then be registered in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181534395
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

ah yes, I copied it at the start but never got around to deleting the old 
one..


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438129#comment-16438129
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181534220
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

otherwise we have to revert to using a `MiniCluster` directly, but then we 
lose the ´@Rule` benefits.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181534220
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

otherwise we have to revert to using a `MiniCluster` directly, but then we 
lose the ´@Rule` benefits.


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438128#comment-16438128
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181534097
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Note that this test is a bit funky at the moment; it should not be possible 
to access the exception at all since it shouldn't be deserializable. I had one 
version where the predicate version of `findThrowble` actually failed since it 
couldn't deserialize it, but this issue for some reason disappeared after a 
rebase.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181534097
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Note that this test is a bit funky at the moment; it should not be possible 
to access the exception at all since it shouldn't be deserializable. I had one 
version where the predicate version of `findThrowble` actually failed since it 
couldn't deserialize it, but this issue for some reason disappeared after a 
rebase.


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438125#comment-16438125
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181533944
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

That's not "easy", it doesn't even solve the issue.

The contained `TemporaryFolder` would only be available after the 
construction of the `MiniClusterResource`, the folder is however necessary to 
create the configuration of the `MiniClusterResource`. Naturally we would like 
both of these to be `Rules`, but that's not possible unless the configuration 
is generated lazily, as i propose in the PR. Due to the `RuleChain` at the time 
that `MiniClusterResource#before` is called we can already access the 
`TemporaryFolder`.

The alternative is for the `TemporaryFolder` to be setup in a 
`@BeforeClass` method which is then used in the for the configuration. That 
would totally works, but it is really unfortunate as the setting of temporary 
paths is rather common and required for virtually all tests that use file-based 
state backends.

I agree that my proposal is quite hacky, but the question we have to ask is 
whether we prefer more complexity in the test vs the `MiniClusterResource`, 
where i would prefer the latter.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181533944
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

That's not "easy", it doesn't even solve the issue.

The contained `TemporaryFolder` would only be available after the 
construction of the `MiniClusterResource`, the folder is however necessary to 
create the configuration of the `MiniClusterResource`. Naturally we would like 
both of these to be `Rules`, but that's not possible unless the configuration 
is generated lazily, as i propose in the PR. Due to the `RuleChain` at the time 
that `MiniClusterResource#before` is called we can already access the 
`TemporaryFolder`.

The alternative is for the `TemporaryFolder` to be setup in a 
`@BeforeClass` method which is then used in the for the configuration. That 
would totally works, but it is really unfortunate as the setting of temporary 
paths is rather common and required for virtually all tests that use file-based 
state backends.

I agree that my proposal is quite hacky, but the question we have to ask is 
whether we prefer more complexity in the test vs the `MiniClusterResource`, 
where i would prefer the latter.


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438119#comment-16438119
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181533175
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Technically that shouldn't be possible. The exception class is part of the 
user-jar which is not on the classpath of the test.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181533175
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Technically that shouldn't be possible. The exception class is part of the 
user-jar which is not on the classpath of the test.


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438117#comment-16438117
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181532978
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

so that we can pass the MiniClusterResource to 
`TestEnvironment.setAsContext()`


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181532978
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

so that we can pass the MiniClusterResource to 
`TestEnvironment.setAsContext()`


---


[jira] [Updated] (FLINK-9157) Support Apache HCatalog in SQL client

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-9157:
--
Summary: Support Apache HCatalog in SQL client  (was: Support for commonly 
used external catalog)

> Support Apache HCatalog in SQL client
> -
>
> Key: FLINK-9157
> URL: https://issues.apache.org/jira/browse/FLINK-9157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. Such as Apache 
> HCatalog. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-1466) Add InputFormat to read HCatalog tables

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-1466:
--
Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-9171

> Add InputFormat to read HCatalog tables
> ---
>
> Key: FLINK-1466
> URL: https://issues.apache.org/jira/browse/FLINK-1466
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API, Scala API
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> HCatalog is a metadata repository and InputFormat to make Hive tables 
> accessible to other frameworks such as Pig.
> Adding support for HCatalog would give access to Hive managed data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9157) Support for commonly used external catalog

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-9157:
--
Issue Type: Bug  (was: Sub-task)
Parent: (was: FLINK-7594)

> Support for commonly used external catalog
> --
>
> Key: FLINK-9157
> URL: https://issues.apache.org/jira/browse/FLINK-9157
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. Such as Apache 
> HCatalog. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9157) Support for commonly used external catalog

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-9157:
--
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-9171

> Support for commonly used external catalog
> --
>
> Key: FLINK-9157
> URL: https://issues.apache.org/jira/browse/FLINK-9157
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. Such as Apache 
> HCatalog. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-1913) Document how to access data in HCatalog

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-1913:
--
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-9171

> Document how to access data in HCatalog
> ---
>
> Key: FLINK-1913
> URL: https://issues.apache.org/jira/browse/FLINK-1913
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, flink-hcatalog
>Reporter: Robert Metzger
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Reading from HCatalog was added in FLINK-1466, but not documented
> We should document how to use the code in {{flink-hcatalog}}.
> Also, there should be an example on how to write to HCatalog using the Hadoop 
> wrappers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9170) HCatolog integration with Table/SQL API

2018-04-13 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-9170:
--
Issue Type: Sub-task  (was: New Feature)
Parent: FLINK-9171

> HCatolog integration with Table/SQL API
> ---
>
> Key: FLINK-9170
> URL: https://issues.apache.org/jira/browse/FLINK-9170
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Zhenqiu Huang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9171) Flink HCatolog integration

2018-04-13 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-9171:
-

 Summary: Flink HCatolog integration 
 Key: FLINK-9171
 URL: https://issues.apache.org/jira/browse/FLINK-9171
 Project: Flink
  Issue Type: Task
Reporter: Shuyi Chen


This is a parent task for all HCatalog related integration in Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9170) HCatolog integration with Table/SQL API

2018-04-13 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-9170:
-

 Summary: HCatolog integration with Table/SQL API
 Key: FLINK-9170
 URL: https://issues.apache.org/jira/browse/FLINK-9170
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Shuyi Chen
Assignee: Zhenqiu Huang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-04-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-9169:
--

Assignee: Tzu-Li (Gordon) Tai

> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9103) SSL verification on TaskManager when parallelism > 1

2018-04-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9103.
--
   Resolution: Fixed
Fix Version/s: 1.4.3
   1.5.0

Fixed via
master: ffb03821ff118b0949d7d42d6b67312ee8732c2b
1.5.0: 688630c6432dd3318936613a3f657f7de475fce7
1.4.3: e76b10d07c657bcf3250ca08b5649c6a242bb01f

> SSL verification on TaskManager when parallelism > 1
> 
>
> Key: FLINK-9103
> URL: https://issues.apache.org/jira/browse/FLINK-9103
> Project: Flink
>  Issue Type: Bug
>  Components: Docker, Network, Security
>Affects Versions: 1.4.0
>Reporter: Edward Rojas
>Assignee: Edward Rojas
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
> Attachments: job.log, task0.log
>
>
> In dynamic environments like Kubernetes, the SSL certificates can be 
> generated to use only the DNS addresses for validation of the identity of 
> servers, given that the IP can change eventually.
>  
> In this cases when executing Jobs with Parallelism set to 1, the SSL 
> validations are good and the Jobmanager can communicate with Task manager and 
> vice versa.
>  
> But with parallelism set to more than 1, SSL validation fails when Task 
> Managers communicate to each other as it seems to try to validate against IP 
> address:
> Caused by: java.security.cert.CertificateException: No subject alternative 
> names matching IP address 172.xx.xxx.xxx found 
> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) 
> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) 
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
>  
> at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601)
>  
> ... 21 more 
>  
> From the logs, it seems the task managers register successfully its full 
> address to Netty, but still the IP is used.
>  
> Attached pertinent logs from JobManager and a TaskManager. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9103) SSL verification on TaskManager when parallelism > 1

2018-04-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9103:


Assignee: Edward Rojas

> SSL verification on TaskManager when parallelism > 1
> 
>
> Key: FLINK-9103
> URL: https://issues.apache.org/jira/browse/FLINK-9103
> Project: Flink
>  Issue Type: Bug
>  Components: Docker, Network, Security
>Affects Versions: 1.4.0
>Reporter: Edward Rojas
>Assignee: Edward Rojas
>Priority: Major
> Attachments: job.log, task0.log
>
>
> In dynamic environments like Kubernetes, the SSL certificates can be 
> generated to use only the DNS addresses for validation of the identity of 
> servers, given that the IP can change eventually.
>  
> In this cases when executing Jobs with Parallelism set to 1, the SSL 
> validations are good and the Jobmanager can communicate with Task manager and 
> vice versa.
>  
> But with parallelism set to more than 1, SSL validation fails when Task 
> Managers communicate to each other as it seems to try to validate against IP 
> address:
> Caused by: java.security.cert.CertificateException: No subject alternative 
> names matching IP address 172.xx.xxx.xxx found 
> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) 
> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) 
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
>  
> at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601)
>  
> ... 21 more 
>  
> From the logs, it seems the task managers register successfully its full 
> address to Netty, but still the IP is used.
>  
> Attached pertinent logs from JobManager and a TaskManager. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9103) SSL verification on TaskManager when parallelism > 1

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437853#comment-16437853
 ] 

ASF GitHub Bot commented on FLINK-9103:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5789


> SSL verification on TaskManager when parallelism > 1
> 
>
> Key: FLINK-9103
> URL: https://issues.apache.org/jira/browse/FLINK-9103
> Project: Flink
>  Issue Type: Bug
>  Components: Docker, Network, Security
>Affects Versions: 1.4.0
>Reporter: Edward Rojas
>Assignee: Edward Rojas
>Priority: Major
> Attachments: job.log, task0.log
>
>
> In dynamic environments like Kubernetes, the SSL certificates can be 
> generated to use only the DNS addresses for validation of the identity of 
> servers, given that the IP can change eventually.
>  
> In this cases when executing Jobs with Parallelism set to 1, the SSL 
> validations are good and the Jobmanager can communicate with Task manager and 
> vice versa.
>  
> But with parallelism set to more than 1, SSL validation fails when Task 
> Managers communicate to each other as it seems to try to validate against IP 
> address:
> Caused by: java.security.cert.CertificateException: No subject alternative 
> names matching IP address 172.xx.xxx.xxx found 
> at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) 
> at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) 
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
>  
> at 
> sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
>  
> at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601)
>  
> ... 21 more 
>  
> From the logs, it seems the task managers register successfully its full 
> address to Netty, but still the IP is used.
>  
> Attached pertinent logs from JobManager and a TaskManager. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5789: [FLINK-9103] Using CanonicalHostName instead of IP...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5789


---


[jira] [Commented] (FLINK-8872) Yarn detached mode via -yd does not detach

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1643#comment-1643
 ] 

ASF GitHub Bot commented on FLINK-8872:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5672


> Yarn detached mode via -yd does not detach
> --
>
> Key: FLINK-8872
> URL: https://issues.apache.org/jira/browse/FLINK-8872
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Running yarn per-job cluster in detached mode currently does not work and 
> waits for the job to finish.
> Example:
> {code}
> ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input
> {code}
> Output in case of an infinite program would then end with something like this:
> {code}
> 2018-03-05 13:41:23,311 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-03-05 13:41:23,313 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> 2018-03-05 13:41:28,342 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN 
> application has been deployed successfully.
> 2018-03-05 13:41:28,343 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink 
> YARN client has been started in detached mode. In order to stop Flink on 
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1519984124671_0006
> Please also note that the temporary files of the YARN session in the home 
> directoy will not be removed.
> Starting execution of program
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5672: [FLINK-8872][flip6] fix yarn detached mode command...

2018-04-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5672


---


[jira] [Resolved] (FLINK-8872) Yarn detached mode via -yd does not detach

2018-04-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8872.
--
Resolution: Fixed

Fixed via
master: fdd1c6ed0fed229612ecde1565d90a06dbe6ff55
1.5.0: 81e1e4c312b467dc64c664e30a7132a9f7d55140

> Yarn detached mode via -yd does not detach
> --
>
> Key: FLINK-8872
> URL: https://issues.apache.org/jira/browse/FLINK-8872
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Running yarn per-job cluster in detached mode currently does not work and 
> waits for the job to finish.
> Example:
> {code}
> ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input
> {code}
> Output in case of an infinite program would then end with something like this:
> {code}
> 2018-03-05 13:41:23,311 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-03-05 13:41:23,313 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> 2018-03-05 13:41:28,342 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN 
> application has been deployed successfully.
> 2018-03-05 13:41:28,343 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink 
> YARN client has been started in detached mode. In order to stop Flink on 
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1519984124671_0006
> Please also note that the temporary files of the YARN session in the home 
> directoy will not be removed.
> Starting execution of program
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437682#comment-16437682
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181467478
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Can't we use `findThrowable(Throwable throwable, Class searchType)`?


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437679#comment-16437679
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181466728
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

I think we are mixing concerns by letting the `MiniClusterResource` change 
its behaviour across multiple tests. I think we should not pass in a 
configuration supplier which can lazily instantiate a configuration. Instead if 
you need a different `MiniClusterResource` per test, then the test should 
instantiate the respective `MiniClusterResource` with the proper configuration.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437680#comment-16437680
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181465966
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -225,6 +247,11 @@ private void startMiniCluster() throws Exception {
}
}
 
+   @Override
+   public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws 
JobExecutionException, InterruptedException {
--- End diff --

I think we should use the `MiniClusterClient` for job submission. The 
`JobExecutor` interface is only an internal interface to bridge between the two 
different mini clusters.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437681#comment-16437681
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181467950
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

The same applies to rule chains. An easy solution would be to create a 
`MiniClusterWithTemporaryFolderResource` which encapsulates this logic.


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437683#comment-16437683
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181468061
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

Why do we need to implement `JobExecutor`?


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181467478
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 ---
@@ -196,21 +188,23 @@ public void 
testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw
// the test also ensures that user specific exceptions are 
serializable between JobManager <--> JobClient.
PackagedProgram streamingCheckpointedProg = new 
PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-   TestStreamEnvironment.setAsContext(
-   testCluster,
+   TestEnvironment.setAsContext(
+   MINI_CLUSTER_RESOURCE,
parallelism,
Collections.singleton(new 
Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
Collections.emptyList());
 
-   // Program should terminate with a 'SuccessException':
-   // we can not access the SuccessException here when executing 
the tests with maven, because its not available in the jar.
-   expectedException.expectCause(
-   Matchers.hasProperty("cause",
-   hasProperty("class",
-   hasProperty("canonicalName", equalTo(
-   
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException");
-
-   streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   try {
+   
streamingCheckpointedProg.invokeInteractiveModeForExecution();
+   } catch (Exception e) {
+   // Program should terminate with a 'SuccessException':
+   // we can not access the SuccessException here when 
executing the tests with maven, because its not available in the jar.
+   Optional expectedCause = 
ExceptionUtils.findThrowable(e,
+   candidate -> 
candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
--- End diff --

Can't we use `findThrowable(Throwable throwable, Class searchType)`?


---


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181465966
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -225,6 +247,11 @@ private void startMiniCluster() throws Exception {
}
}
 
+   @Override
+   public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws 
JobExecutionException, InterruptedException {
--- End diff --

I think we should use the `MiniClusterClient` for job submission. The 
`JobExecutor` interface is only an internal interface to bridge between the two 
different mini clusters.


---


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181467950
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

The same applies to rule chains. An easy solution would be to create a 
`MiniClusterWithTemporaryFolderResource` which encapsulates this logic.


---


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181466728
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() {
 
@Override
public void before() throws Exception {
+   miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get());
--- End diff --

I think we are mixing concerns by letting the `MiniClusterResource` change 
its behaviour across multiple tests. I think we should not pass in a 
configuration supplier which can lazily instantiate a configuration. Instead if 
you need a different `MiniClusterResource` per test, then the test should 
instantiate the respective `MiniClusterResource` with the proper configuration.


---


[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5780#discussion_r181468061
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -49,15 +54,17 @@
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
  */
-public class MiniClusterResource extends ExternalResource {
+public class MiniClusterResource extends ExternalResource implements 
JobExecutor {
--- End diff --

Why do we need to implement `JobExecutor`?


---


[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437667#comment-16437667
 ] 

ASF GitHub Bot commented on FLINK-8955:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5780
  
The `ClassLoaderITCase` fails on Travis. There are also checkstyle 
violations: 
```
10:04:01.988 [ERROR] 
src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[62,15] 
(imports) UnusedImports: Unused import: org.junit.Assert.assertEquals.
10:04:01.988 [ERROR] 
src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[65,15] 
(imports) UnusedImports: Unused import: org.junit.Assert.fail.
```


> Port ClassLoaderITCase to flip6
> ---
>
> Key: FLINK-8955
> URL: https://issues.apache.org/jira/browse/FLINK-8955
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip6

2018-04-13 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5780
  
The `ClassLoaderITCase` fails on Travis. There are also checkstyle 
violations: 
```
10:04:01.988 [ERROR] 
src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[62,15] 
(imports) UnusedImports: Unused import: org.junit.Assert.assertEquals.
10:04:01.988 [ERROR] 
src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[65,15] 
(imports) UnusedImports: Unused import: org.junit.Assert.fail.
```


---


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-13 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437604#comment-16437604
 ] 

Till Rohrmann commented on FLINK-9143:
--

The problem is that we don't necessarily have the cluster configuration when we 
submit the job. Consequently, it is a priori not possible to decide whether the 
cluster has a default restart strategy or not.

The underlying problem with the restart strategies is that the it is a cluster 
configuration as well as a job configuration option. I think it should only be 
a cluster configuration option. If we don't want break the behavior, then a 
solution could be to differentiate between an explicitly set job-specific 
restart strategy and the default job-specific restart strategy set when 
submitting a streaming job with checkpointing enabled.

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437601#comment-16437601
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

GitHub user XiaoZYang opened a pull request:

https://github.com/apache/flink/pull/5845

[FLINK-9168][flink-connectors]Pulsar Sink connector

## What is the purpose of the change
Provide a [pulsar](https://github.com/apache/incubator-pulsar) sink 
connector for flink.

## Brief change log
- add `PulsarTableSink`
- add `PulsarJsonTableSink`

## Verifying this change

Added test that validates that target classses (i.e. PulsarTableSink and 
PulsarJsonTableSink) work as expected

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes ) upgrade 
`org.javassist` version to 3.20.0-GA
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (don't know)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/XiaoZYang/flink pulsar-connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5845


commit fff36440197e3f34caf3af0b7ed7ba5320003e3b
Author: Sijie Guo 
Date:   2018-03-02T05:08:53Z

Add pulsar flink connector

commit 447a998ce9d4de2dbb7b099b0568c5aed9f374bd
Author: xiaozongyang 
Date:   2018-04-13T07:51:49Z

1.update pom.xml to follow flink version




> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...

2018-04-13 Thread XiaoZYang
GitHub user XiaoZYang opened a pull request:

https://github.com/apache/flink/pull/5845

[FLINK-9168][flink-connectors]Pulsar Sink connector

## What is the purpose of the change
Provide a [pulsar](https://github.com/apache/incubator-pulsar) sink 
connector for flink.

## Brief change log
- add `PulsarTableSink`
- add `PulsarJsonTableSink`

## Verifying this change

Added test that validates that target classses (i.e. PulsarTableSink and 
PulsarJsonTableSink) work as expected

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes ) upgrade 
`org.javassist` version to 3.20.0-GA
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (don't know)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/XiaoZYang/flink pulsar-connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5845


commit fff36440197e3f34caf3af0b7ed7ba5320003e3b
Author: Sijie Guo 
Date:   2018-03-02T05:08:53Z

Add pulsar flink connector

commit 447a998ce9d4de2dbb7b099b0568c5aed9f374bd
Author: xiaozongyang 
Date:   2018-04-13T07:51:49Z

1.update pom.xml to follow flink version




---


[jira] [Created] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-04-13 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9169:


 Summary: NPE when restoring from old savepoint and state 
serializer could not be deserialized
 Key: FLINK-9169
 URL: https://issues.apache.org/jira/browse/FLINK-9169
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2, 1.5.0
Reporter: Till Rohrmann


A user reported to have observed the following exception when restoring a Flink 
job from a 1.3 savepoint with Flink 1.4.

{code}
2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task 
- ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
6fa6) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Could not initialize keyed state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
ackend.java:1216)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
dStateBackend.java:1153)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
139)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more
{code}

Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
{{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the {{stateSerializer}} 
can be {{null}}. This is not the problem, however, in 
{{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
{{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
where we null check the state serializer. This will then fail with an 
indescriptive NPE.

I think the same should happen when resuming with Flink 1.5 from a 1.4 
savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437396#comment-16437396
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181414763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

This is virtually the same as 
`org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why 
do we have to duplicate it?

```
diff 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
19c19
< package org.apache.flink.runtime.rest.handler.legacy.metrics;
---
> package org.apache.flink.runtime.rest.handler.job.metrics;
```


> Port AbstractAggregatingMetricsHandler to RestServerEndpoint
> 
>
> Key: FLINK-8370
> URL: https://issues.apache.org/jira/browse/FLINK-8370
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port subclasses of 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}}
>  to new FLIP-6 {{RestServerEndpoint}}.
> The following handlers need to be migrated:
> * {{AggregatingJobsMetricsHandler}}
> * {{AggregatingSubtasksMetricsHandler}}
> * {{AggregatingTaskManagersMetricsHandler}}
> New handlers should then be registered in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181414763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
 ---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+/**
+ * An interface for accumulating double values.
+ */
+interface DoubleAccumulator {
--- End diff --

This is virtually the same as 
`org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why 
do we have to duplicate it?

```
diff 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java
 
./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java
19c19
< package org.apache.flink.runtime.rest.handler.legacy.metrics;
---
> package org.apache.flink.runtime.rest.handler.job.metrics;
```


---


[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437392#comment-16437392
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181408771
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
--- End diff --

I think there is a potential NPE because 
`store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return 
`null`.


> Port AbstractAggregatingMetricsHandler to RestServerEndpoint
> 
>
> Key: FLINK-8370
> URL: https://issues.apache.org/jira/browse/FLINK-8370
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
> 

[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437388#comment-16437388
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181401972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
--- End diff --

Should be replaced.


> Port AbstractAggregatingMetricsHandler to RestServerEndpoint
> 
>
> Key: FLINK-8370
> URL: https://issues.apache.org/jira/browse/FLINK-8370
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port subclasses of 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}}
>  to new FLIP-6 {{RestServerEndpoint}}.
> The following handlers need to be migrated:
> * {{AggregatingJobsMetricsHandler}}
> * {{AggregatingSubtasksMetricsHandler}}
> * {{AggregatingTaskManagersMetricsHandler}}
> New handlers should then be registered in {{WebMonitorEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437391#comment-16437391
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
 

[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437389#comment-16437389
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   

[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437393#comment-16437393
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411022
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
 

[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437390#comment-16437390
 ] 

ASF GitHub Bot commented on FLINK-8370:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404252
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411022
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+   

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181408771
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
--- End diff --

I think there is a potential NPE because 
`store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return 
`null`.


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181404370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract request handler for querying aggregated metrics. Subclasses 
return either a list of all available metrics
+ * or the aggregated values of them across all/selected entities.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * The "agg" query parameter is used to define which aggregates should 
be calculated. Available aggregations are
+ * "sum", "max", "min" and "avg". If the parameter is not specified, all 
aggregations will be returned.
+ * {@code /metrics?get=X,Y=min,max}
+ * The handler will then return a list of objects containing the 
aggregations for the requested metrics.
+ * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", 
"4", "max", "10"}]}
+ */
+public abstract class AbstractAggregatingMetricsHandler> extends 
AbstractRestHandler {
+
+   private final Executor executor;
+   private final MetricFetcher fetcher;
+
+   protected AbstractAggregatingMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   AbstractAggregatedMetricsHeaders messageHeaders,
+   Executor executor,
+   MetricFetcher 

[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181401972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Locale;
+
+/**
+ * TODO: add javadoc.
--- End diff --

Should be replaced.


---


[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...

2018-04-13 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5805#discussion_r181411248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.UnionIterator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns, aggregated across subtasks, a list of all 
available metrics or the values
+ * for a set of metrics.
+ *
+ * Specific subtasks can be selected for aggregation by specifying a 
comma-separated list of integer ranges.
+ * {@code /metrics?get=X,Y=0-2,4-5}
+ */
+public class AggregatingSubtasksMetricsHandler extends 
AbstractAggregatingMetricsHandler {
+
+   public AggregatingSubtasksMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever 
leaderRetriever,
+   Time timeout,
+   Map responseHeaders,
+   Executor executor,
+   MetricFetcher fetcher) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, 
fetcher);
+   }
+
+   @Override
+   Collection 
getStores(MetricStore store, HandlerRequest request) {
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   JobVertexID taskID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+
+   Collection subtaskRanges = 
request.getQueryParameter(SubtasksFilterQueryParameter.class);
+   if (subtaskRanges.isEmpty()) {
+   return store.getTaskMetricStore(jobID.toString(), 
taskID.toString()).getAllSubtaskMetricStores();
+   } else {
+   Iterable subtasks = 
getIntegerRangeFromString(subtaskRanges);
+   Collection 
subtaskStores = new ArrayList<>(8);
+   for (int subtask : subtasks) {
+   
subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), 
taskID.toString(), subtask));
+   }
+   return subtaskStores;
+   }
+   }
+
+   private Iterable getIntegerRangeFromString(Collection 
ranges) {
+   UnionIterator iterators = new UnionIterator<>();
+
+   for (String rawRange : ranges) {
+   try {
+   Iterator rangeIterator;
+   

[jira] [Commented] (FLINK-9165) Improve CassandraSinkBase to send Collections

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437261#comment-16437261
 ] 

ASF GitHub Bot commented on FLINK-9165:
---

Github user Bekreth closed the pull request at:

https://github.com/apache/flink/pull/5844


> Improve CassandraSinkBase to send Collections
> -
>
> Key: FLINK-9165
> URL: https://issues.apache.org/jira/browse/FLINK-9165
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: easyfix, feature
>
> The CassandraSinkBase can currently only handle individual objects.  I 
> propose overloading the `send(IN value)` method to include 
> `send(Collection value)`. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5844: [FLINK-9165] [DataSink] Upgrading CassandraSinkBas...

2018-04-13 Thread Bekreth
Github user Bekreth closed the pull request at:

https://github.com/apache/flink/pull/5844


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-04-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
@juhoautio

Your concerns for this fix is quite correct, and is why this PR was closed 
in the first place as there are a lot of ill-defined semantics introduced by 
this.

Regarding your thought on relating idleness to empty results returned from 
Kafka: I think that seems like a good approach, and should also capture Eron's 
comment quite well.


---


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437175#comment-16437175
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
@juhoautio

Your concerns for this fix is quite correct, and is why this PR was closed 
in the first place as there are a lot of ill-defined semantics introduced by 
this.

Regarding your thought on relating idleness to empty results returned from 
Kafka: I think that seems like a good approach, and should also capture Eron's 
comment quite well.


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9168) Pulsar Sink Connector

2018-04-13 Thread Zongyang Xiao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437030#comment-16437030
 ] 

Zongyang Xiao edited comment on FLINK-9168 at 4/13/18 9:01 AM:
---

Related works will be done by [~si...@apache.org],  [~zhaijia]and 
[~xiaozongyang_bupt] .


was (Author: xiaozongyang_bupt):
Related works will be done by [~si...@apache.org] and [~xiaozongyang_bupt] .

> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Zongyang Xiao
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-04-13 Thread Xiao Zongyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437030#comment-16437030
 ] 

Xiao Zongyang commented on FLINK-9168:
--

Related works will be done by [~si...@apache.org] and [~xiaozongyang_bupt] .

> Pulsar Sink Connector
> -
>
> Key: FLINK-9168
> URL: https://issues.apache.org/jira/browse/FLINK-9168
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Xiao Zongyang
>Priority: Minor
> Fix For: 1.6.0
>
>
> Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9168) Pulsar Sink Connector

2018-04-13 Thread Xiao Zongyang (JIRA)
Xiao Zongyang created FLINK-9168:


 Summary: Pulsar Sink Connector
 Key: FLINK-9168
 URL: https://issues.apache.org/jira/browse/FLINK-9168
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Affects Versions: 1.6.0
Reporter: Xiao Zongyang
 Fix For: 1.6.0


Flink does not provide a sink connector for Pulsar.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437021#comment-16437021
 ] 

ASF GitHub Bot commented on FLINK-5479:
---

Github user juhoautio commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai did you ever test your code? I tried it and it allowed watermarks 
to proceed but apparently too aggressively, as it caused a lot of data loss.

I'm looking for a quick fix for this issue, as it seems that FLINK-5479 
won't be fixed too soon. So I would very much like to hear if you have been 
able to fix this in some lighter way.

My understanding of your PR is that it doesn't work reliably because it 
just seems to add an internal timeout, that could be surpassed whenever the 
consumer is for example busy consuming other partitions. Please comment if this 
perception is wrong.

I'm thinking that it should instead get the information that a partition 
was idle from the kafka client, and only in that case (empty result from 
client) create a newer watermark for that partition. It shouldn't mark the 
partition to some idle state – and shouldn't create newer watermarks 
periodically without any connection to another empty result from the client. 
New watermarks should be only generated as a callback of the kafka client 
result..?


> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-04-13 Thread juhoautio
Github user juhoautio commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai did you ever test your code? I tried it and it allowed watermarks 
to proceed but apparently too aggressively, as it caused a lot of data loss.

I'm looking for a quick fix for this issue, as it seems that FLINK-5479 
won't be fixed too soon. So I would very much like to hear if you have been 
able to fix this in some lighter way.

My understanding of your PR is that it doesn't work reliably because it 
just seems to add an internal timeout, that could be surpassed whenever the 
consumer is for example busy consuming other partitions. Please comment if this 
perception is wrong.

I'm thinking that it should instead get the information that a partition 
was idle from the kafka client, and only in that case (empty result from 
client) create a newer watermark for that partition. It shouldn't mark the 
partition to some idle state – and shouldn't create newer watermarks 
periodically without any connection to another empty result from the client. 
New watermarks should be only generated as a callback of the kafka client 
result..?


---


[jira] [Commented] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436999#comment-16436999
 ] 

ASF GitHub Bot commented on FLINK-6557:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3899


> RocksDBKeyedStateBackend broken on Windows
> --
>
> Key: FLINK-6557
> URL: https://issues.apache.org/jira/browse/FLINK-6557
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the 
> result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will 
> fail when trying to create the checkpoint file as the path begins with a 
> slash which isn't a valid Windows path:
> {code}
> /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #3899: [FLINK-6557] Fix RocksDBKeyedStateBackend on Windo...

2018-04-13 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3899


---


[jira] [Comment Edited] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-04-13 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436997#comment-16436997
 ] 

Chesnay Schepler edited comment on FLINK-8867 at 4/13/18 8:22 AM:
--

FLINK-6557 may be relevant as it is also caused by passing Flink `Paths` to 
RocksDB.


was (Author: zentol):
FLINK-6557 may be relevant as it is also caused by passing Flink `Path` for 
local files to RocksDB.

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.5.0
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config

2018-04-13 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436997#comment-16436997
 ] 

Chesnay Schepler commented on FLINK-8867:
-

FLINK-6557 may be relevant as it is also caused by passing Flink `Path` for 
local files to RocksDB.

> Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
> 
>
> Key: FLINK-8867
> URL: https://issues.apache.org/jira/browse/FLINK-8867
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, State Backends, Checkpointing, YARN
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Shashank Agarwal
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.5.0
>
>
> In our setup, when we put an entry in our Flink_conf file for default schema.
> {code}
> fs.default-scheme: hdfs://mydomain.com:8020/flink
> {code}
> Than application with rocksdb state backend fails with the following 
> exception. When we remove this config it works fine. It's working fine with 
> other state backends.
> {code}
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
> for operator order ip stream (1/1).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1 for 
> operator order ip stream (1/1).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>   ... 7 more
>   Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>   ... 5 more
>   [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9165) Improve CassandraSinkBase to send Collections

2018-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436994#comment-16436994
 ] 

ASF GitHub Bot commented on FLINK-9165:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5844
  
Even then `invoke(Collection)` would never be called. `IN` is always 
the type of the `DataStream` the sink is applied on, and it can't be 
`Collection` as you would have an infinitely recursive type.

You need to create a subclass that extends 
`CassandraSinkBase` which can then be applied to 
`DataStream`, and that you can already do. Note that it 
is rather unusual for the stream to contain collections, so I doubt we would be 
adding much value here.

If instead you you want a sink of type `IN` that buffers elements and sends 
them in batches you can use the `CassandraWriteAheadSink`.

Finally, do note that cassandra batches are a far cry from transactions. 
IIRC the guarantees work as such that either the batch completes successfully 
in which case all element are written, or it fails in which case _some_ records 
may have been written.


> Improve CassandraSinkBase to send Collections
> -
>
> Key: FLINK-9165
> URL: https://issues.apache.org/jira/browse/FLINK-9165
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: easyfix, feature
>
> The CassandraSinkBase can currently only handle individual objects.  I 
> propose overloading the `send(IN value)` method to include 
> `send(Collection value)`. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5844: [FLINK-9165] [DataSink] Upgrading CassandraSinkBase to su...

2018-04-13 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5844
  
Even then `invoke(Collection)` would never be called. `IN` is always 
the type of the `DataStream` the sink is applied on, and it can't be 
`Collection` as you would have an infinitely recursive type.

You need to create a subclass that extends 
`CassandraSinkBase` which can then be applied to 
`DataStream`, and that you can already do. Note that it 
is rather unusual for the stream to contain collections, so I doubt we would be 
adding much value here.

If instead you you want a sink of type `IN` that buffers elements and sends 
them in batches you can use the `CassandraWriteAheadSink`.

Finally, do note that cassandra batches are a far cry from transactions. 
IIRC the guarantees work as such that either the batch completes successfully 
in which case all element are written, or it fails in which case _some_ records 
may have been written.


---


[jira] [Updated] (FLINK-9166) Performance issue with Flink SQL

2018-04-13 Thread SUBRAMANYA SURESH (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SUBRAMANYA SURESH updated FLINK-9166:
-
Description: 
With a high number of Flink SQL queries (100 of below), the Flink command line 
client fails with a "JobManager did not respond within 60 ms" on a Yarn 
cluster.
 * JobManager logs has nothing after the last TaskManager started except DEBUG 
logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in 
JobManager", indicating its likely stuck (creating the ExecutionGraph?).
 * The same works as standalone java program locally (high CPU initially)
 * Note: Each Row in structStream contains 515 columns (many end up null) 
including a column that has the raw message.
 * In the YARN cluster we specify 18GB for TaskManager, 18GB for the 
JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 
(partitions in our Kafka source).

*Query:*
{code:java}
 select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
EventTimestamp, RawMsg, Source 
 from structStream
 where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
and Outcome='Success'
 group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
CollectedTimestamp, EventTimestamp, RawMsg, Source
{code}
*Code:*
{code:java}
public static void main(String[] args) throws Exception {
 
FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
 new HashMap<>());

 final StreamExecutionEnvironment streamingEnvironment = 
getStreamExecutionEnvironment();
 final StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(streamingEnvironment);

 final DataStream structStream = 
getKafkaStreamOfRows(streamingEnvironment);
 tableEnv.registerDataStream("structStream", structStream);
 tableEnv.scan("structStream").printSchema();

 for (int i = 0; i < 100; i++){
   for (String query : Queries.sample){
 // Queries.sample has one query that is above. 
 Table selectQuery = tableEnv.sqlQuery(query);

 DataStream selectQueryStream = tableEnv.toAppendStream(selectQuery,  
Row.class);
 selectQueryStream.print();
   }
 }

 // execute program
 streamingEnvironment.execute("Kafka Streaming SQL");
}

private static DataStream getKafkaStreamOfRows(StreamExecutionEnvironment 
environment) throws Exception {
  Properties properties = getKafkaProperties();
  // TestDeserializer deserializes the JSON to a ROW of string columns (515)
  // and also adds a column for the raw message. 
  FlinkKafkaConsumer011 consumer = new 
FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new
TestDeserializer(getRowTypeInfo()), properties);
  DataStream stream = environment.addSource(consumer);

  return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
  // This has 515 fields. 
  List fieldNames = DDIManager.getDDIFieldNames();
  fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
  fieldNames.add("proctime");

 // Fill typeInformationArray with StringType to all but the last field which 
is of type Time
  .
  return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() 
throws IOException {
  final StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(); 
   env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

   env.enableCheckpointing(6);
   env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
   env.setParallelism(725);
   return env;
}
{code}

  was:
With a high number of Flink SQL queries (100 of below), the Flink command line 
client fails with a "JobManager did not respond within 60 ms" on a Yarn 
cluster.
 * JobManager logs has nothing after the last TaskManager started except DEBUG 
logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in 
JobManager", indicating its likely stuck (creating the ExecutionGraph?).
 * The same works as standalone java program locally (high CPU initially)
 * Note: Each Row in structStream contains 515 columns (many end up null) 
including a column that has the raw message.
 * In the YARN cluster we specify 18GB for TaskManager, 18GB for the 
JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka 
source).

*Query:*
{code:java}
 select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
EventTimestamp, RawMsg, Source 
 from structStream
 where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
and Outcome='Success'
 group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
CollectedTimestamp, EventTimestamp, RawMsg, Source
{code}
*Code:*
{code:java}
public static void main(String[] args) throws Exception {
 
FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
 new HashMap<>());

 final StreamExecutionEnvironment