[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)
[ https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438223#comment-16438223 ] ASF GitHub Bot commented on FLINK-8910: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r181540905 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- How about this way: setup the cluster with one slot per TM, and use the `host:pid` to act as the "`allocationID`"? > Introduce automated end-to-end test for local recovery (including sticky > scheduling) > > > Key: FLINK-8910 > URL: https://issues.apache.org/jira/browse/FLINK-8910 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > We should have an automated end-to-end test that can run nightly to check > that sticky allocation and local recovery work as expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5676#discussion_r181540905 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java --- @@ -107,4 +131,12 @@ public int getAttemptNumber() { public String getTaskNameWithSubtasks() { return this.taskNameWithSubtasks; } + + /** +* Returns the allocation id for where this task is executed. +* @return the allocation id for where this task is executed. +*/ + public String getAllocationID() { --- End diff -- How about this way: setup the cluster with one slot per TM, and use the `host:pid` to act as the "`allocationID`"? ---
[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Lau updated FLINK-9173: --- Description: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} In the development environment,such as, Eclipse Luna. The job of the application can be submitted to the standalone cluster, via Spring boot Application main method. But mvn spring-boot:run will print this exception. Local operation system is Mac OSX , the jdk version is 1.8.0_151. was: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at
[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Lau updated FLINK-9173: --- Description: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} In the development environment,such as, Eclipse Luna. The job of the application can be submitted to the standalone cluster, via Spring boot Application main method. But mvn spring-boot:run will pring this exception. Local operation system is Mac OSX , the jdk version is 1.8.0_151. was: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at
[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Lau updated FLINK-9173: --- Description: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} In the development environment,such as, Eclipse Luna. The job of the application can be submitted to the standalone cluster, via Spring boot Application main method. Bug mvn spring-boot:run will pring this exception. Local operation system is Mac OSX , the jdk version is 1.8.0_151. was: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at
[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Lau updated FLINK-9173: --- Component/s: flink-shaded.git > RestClient - Received response is abnormal > -- > > Key: FLINK-9173 > URL: https://issues.apache.org/jira/browse/FLINK-9173 > Project: Flink > Issue Type: Bug > Components: flink-shaded.git, Job-Submission, REST >Affects Versions: 1.5.0 > Environment: OS: CentOS 6.8 > JAVA: 1.8.0_161-b12 > maven-plugin: spring-boot-maven-plugin > Spring-boot: 1.5.10.RELEASE >Reporter: Bob Lau >Priority: Major > > The system prints the exception log as follows: > > {code:java} > //代码占位符 > 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR > o.a.flink.runtime.rest.RestClient - Received response was neither of the > expected type ([simple type, class > org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) > nor an error. > Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: > Unrecognized field "status" (class > org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as > ignorable (one known property: "errors"]) > at [Source: N/A; line: -1, column: -1] (through reference chain: > org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) > at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > In the development environment,such as, Eclipse Luna. > The job of the application can be submitted to the standalone cluster. > Local operation system is Mac OSX , the jdk version is 1.8.0_151. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9173) RestClient - Received response is abnormal
[ https://issues.apache.org/jira/browse/FLINK-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Lau updated FLINK-9173: --- Description: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} In the development environment,such as, Eclipse Luna. The job of the application can be submitted to the standalone cluster. Local operation system is Mac OSX , the jdk version is 1.8.0_151. was: The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at
[jira] [Created] (FLINK-9173) RestClient - Received response is abnormal
Bob Lau created FLINK-9173: -- Summary: RestClient - Received response is abnormal Key: FLINK-9173 URL: https://issues.apache.org/jira/browse/FLINK-9173 Project: Flink Issue Type: Bug Components: Job-Submission, REST Affects Versions: 1.5.0 Environment: OS: CentOS 6.8 JAVA: 1.8.0_161-b12 maven-plugin: spring-boot-maven-plugin Spring-boot: 1.5.10.RELEASE Reporter: Bob Lau The system prints the exception log as follows: {code:java} //代码占位符 09:07:20.755 tysc_log [Flink-RestClusterClient-IO-thread-4] ERROR o.a.flink.runtime.rest.RestClient - Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=org.apache.flink.runtime.rest.RestClient$JsonResponse@2ac43968 org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as ignorable (one known property: "errors"]) at [Source: N/A; line: -1, column: -1] (through reference chain: org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:851) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1085) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1392) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1346) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:455) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1127) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3779) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2050) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2547) at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:210) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9157) Support Apache HCatalog in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438151#comment-16438151 ] Rong Rong commented on FLINK-9157: -- Hi [~dmitry_kober], yes, that would be great! I think for now feel free to take a look at the related tasks. it is currently blocked by FLINK-9170. Once the integration with Table/SQL API is done. we can probably start with the integration. > Support Apache HCatalog in SQL client > - > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Having SQL-Client to support first external catalog: Apache HCatalog out of > the box. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9172: - Issue Type: Sub-task (was: New Feature) Parent: FLINK-7594 > Support external catalog factory that comes default with SQL-Client > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
Rong Rong created FLINK-9172: Summary: Support external catalog factory that comes default with SQL-Client Key: FLINK-9172 URL: https://issues.apache.org/jira/browse/FLINK-9172 Project: Flink Issue Type: New Feature Reporter: Rong Rong It will be great to have SQL-Client to support some external catalogs out-of-the-box for SQL users to configure and utilize easily. I am currently think of having an external catalog factory that spins up both streaming and batch external catalog table sources and sinks. This could greatly unify and provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support Apache HCatalog in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9157: - Description: Having SQL-Client to support first external catalog: Apache HCatalog out of the box. (was: It will be great to have SQL-Client to support some external catalogs out-of-the-box for SQL users to configure and utilize easily. Such as Apache HCatalog. ) > Support Apache HCatalog in SQL client > - > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Having SQL-Client to support first external catalog: Apache HCatalog out of > the box. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438133#comment-16438133 ] ASF GitHub Bot commented on FLINK-8370: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534458 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges)
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534458 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; +
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438130#comment-16438130 ] ASF GitHub Bot commented on FLINK-8370: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534395 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- ah yes, I copied it at the start but never got around to deleting the old one.. > Port AbstractAggregatingMetricsHandler to RestServerEndpoint > > > Key: FLINK-8370 > URL: https://issues.apache.org/jira/browse/FLINK-8370 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Port subclasses of > {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}} > to new FLIP-6 {{RestServerEndpoint}}. > The following handlers need to be migrated: > * {{AggregatingJobsMetricsHandler}} > * {{AggregatingSubtasksMetricsHandler}} > * {{AggregatingTaskManagersMetricsHandler}} > New handlers should then be registered in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181534395 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- ah yes, I copied it at the start but never got around to deleting the old one.. ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438129#comment-16438129 ] ASF GitHub Bot commented on FLINK-8955: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181534220 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- otherwise we have to revert to using a `MiniCluster` directly, but then we lose the ´@Rule` benefits. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181534220 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- otherwise we have to revert to using a `MiniCluster` directly, but then we lose the ´@Rule` benefits. ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438128#comment-16438128 ] ASF GitHub Bot commented on FLINK-8955: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181534097 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Note that this test is a bit funky at the moment; it should not be possible to access the exception at all since it shouldn't be deserializable. I had one version where the predicate version of `findThrowble` actually failed since it couldn't deserialize it, but this issue for some reason disappeared after a rebase. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181534097 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Note that this test is a bit funky at the moment; it should not be possible to access the exception at all since it shouldn't be deserializable. I had one version where the predicate version of `findThrowble` actually failed since it couldn't deserialize it, but this issue for some reason disappeared after a rebase. ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438125#comment-16438125 ] ASF GitHub Bot commented on FLINK-8955: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181533944 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- That's not "easy", it doesn't even solve the issue. The contained `TemporaryFolder` would only be available after the construction of the `MiniClusterResource`, the folder is however necessary to create the configuration of the `MiniClusterResource`. Naturally we would like both of these to be `Rules`, but that's not possible unless the configuration is generated lazily, as i propose in the PR. Due to the `RuleChain` at the time that `MiniClusterResource#before` is called we can already access the `TemporaryFolder`. The alternative is for the `TemporaryFolder` to be setup in a `@BeforeClass` method which is then used in the for the configuration. That would totally works, but it is really unfortunate as the setting of temporary paths is rather common and required for virtually all tests that use file-based state backends. I agree that my proposal is quite hacky, but the question we have to ask is whether we prefer more complexity in the test vs the `MiniClusterResource`, where i would prefer the latter. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181533944 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- That's not "easy", it doesn't even solve the issue. The contained `TemporaryFolder` would only be available after the construction of the `MiniClusterResource`, the folder is however necessary to create the configuration of the `MiniClusterResource`. Naturally we would like both of these to be `Rules`, but that's not possible unless the configuration is generated lazily, as i propose in the PR. Due to the `RuleChain` at the time that `MiniClusterResource#before` is called we can already access the `TemporaryFolder`. The alternative is for the `TemporaryFolder` to be setup in a `@BeforeClass` method which is then used in the for the configuration. That would totally works, but it is really unfortunate as the setting of temporary paths is rather common and required for virtually all tests that use file-based state backends. I agree that my proposal is quite hacky, but the question we have to ask is whether we prefer more complexity in the test vs the `MiniClusterResource`, where i would prefer the latter. ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438119#comment-16438119 ] ASF GitHub Bot commented on FLINK-8955: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181533175 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Technically that shouldn't be possible. The exception class is part of the user-jar which is not on the classpath of the test. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181533175 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Technically that shouldn't be possible. The exception class is part of the user-jar which is not on the classpath of the test. ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438117#comment-16438117 ] ASF GitHub Bot commented on FLINK-8955: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181532978 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- so that we can pass the MiniClusterResource to `TestEnvironment.setAsContext()` > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181532978 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- so that we can pass the MiniClusterResource to `TestEnvironment.setAsContext()` ---
[jira] [Updated] (FLINK-9157) Support Apache HCatalog in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Summary: Support Apache HCatalog in SQL client (was: Support for commonly used external catalog) > Support Apache HCatalog in SQL client > - > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-1466) Add InputFormat to read HCatalog tables
[ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-1466: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-9171 > Add InputFormat to read HCatalog tables > --- > > Key: FLINK-1466 > URL: https://issues.apache.org/jira/browse/FLINK-1466 > Project: Flink > Issue Type: Sub-task > Components: Java API, Scala API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > HCatalog is a metadata repository and InputFormat to make Hive tables > accessible to other frameworks such as Pig. > Adding support for HCatalog would give access to Hive managed data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support for commonly used external catalog
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Issue Type: Bug (was: Sub-task) Parent: (was: FLINK-7594) > Support for commonly used external catalog > -- > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support for commonly used external catalog
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Issue Type: Sub-task (was: Bug) Parent: FLINK-9171 > Support for commonly used external catalog > -- > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-1913) Document how to access data in HCatalog
[ https://issues.apache.org/jira/browse/FLINK-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-1913: -- Issue Type: Sub-task (was: Bug) Parent: FLINK-9171 > Document how to access data in HCatalog > --- > > Key: FLINK-1913 > URL: https://issues.apache.org/jira/browse/FLINK-1913 > Project: Flink > Issue Type: Sub-task > Components: Documentation, flink-hcatalog >Reporter: Robert Metzger >Assignee: Zhenqiu Huang >Priority: Major > > Reading from HCatalog was added in FLINK-1466, but not documented > We should document how to use the code in {{flink-hcatalog}}. > Also, there should be an example on how to write to HCatalog using the Hadoop > wrappers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9170) HCatolog integration with Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-9170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9170: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-9171 > HCatolog integration with Table/SQL API > --- > > Key: FLINK-9170 > URL: https://issues.apache.org/jira/browse/FLINK-9170 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Shuyi Chen >Assignee: Zhenqiu Huang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9171) Flink HCatolog integration
Shuyi Chen created FLINK-9171: - Summary: Flink HCatolog integration Key: FLINK-9171 URL: https://issues.apache.org/jira/browse/FLINK-9171 Project: Flink Issue Type: Task Reporter: Shuyi Chen This is a parent task for all HCatalog related integration in Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9170) HCatolog integration with Table/SQL API
Shuyi Chen created FLINK-9170: - Summary: HCatolog integration with Table/SQL API Key: FLINK-9170 URL: https://issues.apache.org/jira/browse/FLINK-9170 Project: Flink Issue Type: New Feature Components: Table API SQL Reporter: Shuyi Chen Assignee: Zhenqiu Huang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-9169: -- Assignee: Tzu-Li (Gordon) Tai > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9103) SSL verification on TaskManager when parallelism > 1
[ https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9103. -- Resolution: Fixed Fix Version/s: 1.4.3 1.5.0 Fixed via master: ffb03821ff118b0949d7d42d6b67312ee8732c2b 1.5.0: 688630c6432dd3318936613a3f657f7de475fce7 1.4.3: e76b10d07c657bcf3250ca08b5649c6a242bb01f > SSL verification on TaskManager when parallelism > 1 > > > Key: FLINK-9103 > URL: https://issues.apache.org/jira/browse/FLINK-9103 > Project: Flink > Issue Type: Bug > Components: Docker, Network, Security >Affects Versions: 1.4.0 >Reporter: Edward Rojas >Assignee: Edward Rojas >Priority: Major > Fix For: 1.5.0, 1.4.3 > > Attachments: job.log, task0.log > > > In dynamic environments like Kubernetes, the SSL certificates can be > generated to use only the DNS addresses for validation of the identity of > servers, given that the IP can change eventually. > > In this cases when executing Jobs with Parallelism set to 1, the SSL > validations are good and the Jobmanager can communicate with Task manager and > vice versa. > > But with parallelism set to more than 1, SSL validation fails when Task > Managers communicate to each other as it seems to try to validate against IP > address: > Caused by: java.security.cert.CertificateException: No subject alternative > names matching IP address 172.xx.xxx.xxx found > at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) > at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) > > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) > > at > sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252) > > at > sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601) > > ... 21 more > > From the logs, it seems the task managers register successfully its full > address to Netty, but still the IP is used. > > Attached pertinent logs from JobManager and a TaskManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9103) SSL verification on TaskManager when parallelism > 1
[ https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9103: Assignee: Edward Rojas > SSL verification on TaskManager when parallelism > 1 > > > Key: FLINK-9103 > URL: https://issues.apache.org/jira/browse/FLINK-9103 > Project: Flink > Issue Type: Bug > Components: Docker, Network, Security >Affects Versions: 1.4.0 >Reporter: Edward Rojas >Assignee: Edward Rojas >Priority: Major > Attachments: job.log, task0.log > > > In dynamic environments like Kubernetes, the SSL certificates can be > generated to use only the DNS addresses for validation of the identity of > servers, given that the IP can change eventually. > > In this cases when executing Jobs with Parallelism set to 1, the SSL > validations are good and the Jobmanager can communicate with Task manager and > vice versa. > > But with parallelism set to more than 1, SSL validation fails when Task > Managers communicate to each other as it seems to try to validate against IP > address: > Caused by: java.security.cert.CertificateException: No subject alternative > names matching IP address 172.xx.xxx.xxx found > at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) > at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) > > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) > > at > sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252) > > at > sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601) > > ... 21 more > > From the logs, it seems the task managers register successfully its full > address to Netty, but still the IP is used. > > Attached pertinent logs from JobManager and a TaskManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9103) SSL verification on TaskManager when parallelism > 1
[ https://issues.apache.org/jira/browse/FLINK-9103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437853#comment-16437853 ] ASF GitHub Bot commented on FLINK-9103: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5789 > SSL verification on TaskManager when parallelism > 1 > > > Key: FLINK-9103 > URL: https://issues.apache.org/jira/browse/FLINK-9103 > Project: Flink > Issue Type: Bug > Components: Docker, Network, Security >Affects Versions: 1.4.0 >Reporter: Edward Rojas >Assignee: Edward Rojas >Priority: Major > Attachments: job.log, task0.log > > > In dynamic environments like Kubernetes, the SSL certificates can be > generated to use only the DNS addresses for validation of the identity of > servers, given that the IP can change eventually. > > In this cases when executing Jobs with Parallelism set to 1, the SSL > validations are good and the Jobmanager can communicate with Task manager and > vice versa. > > But with parallelism set to more than 1, SSL validation fails when Task > Managers communicate to each other as it seems to try to validate against IP > address: > Caused by: java.security.cert.CertificateException: No subject alternative > names matching IP address 172.xx.xxx.xxx found > at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) > at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) > > at > sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) > > at > sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252) > > at > sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601) > > ... 21 more > > From the logs, it seems the task managers register successfully its full > address to Netty, but still the IP is used. > > Attached pertinent logs from JobManager and a TaskManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5789: [FLINK-9103] Using CanonicalHostName instead of IP...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5789 ---
[jira] [Commented] (FLINK-8872) Yarn detached mode via -yd does not detach
[ https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1643#comment-1643 ] ASF GitHub Bot commented on FLINK-8872: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5672 > Yarn detached mode via -yd does not detach > -- > > Key: FLINK-8872 > URL: https://issues.apache.org/jira/browse/FLINK-8872 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Running yarn per-job cluster in detached mode currently does not work and > waits for the job to finish. > Example: > {code} > ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input > {code} > Output in case of an infinite program would then end with something like this: > {code} > 2018-03-05 13:41:23,311 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for > the cluster to be allocated > 2018-03-05 13:41:23,313 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying > cluster, current state ACCEPTED > 2018-03-05 13:41:28,342 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN > application has been deployed successfully. > 2018-03-05 13:41:28,343 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink > YARN client has been started in detached mode. In order to stop Flink on > YARN, use the following command or a YARN web interface to stop it: > yarn application -kill application_1519984124671_0006 > Please also note that the temporary files of the YARN session in the home > directoy will not be removed. > Starting execution of program > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5672: [FLINK-8872][flip6] fix yarn detached mode command...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5672 ---
[jira] [Resolved] (FLINK-8872) Yarn detached mode via -yd does not detach
[ https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8872. -- Resolution: Fixed Fixed via master: fdd1c6ed0fed229612ecde1565d90a06dbe6ff55 1.5.0: 81e1e4c312b467dc64c664e30a7132a9f7d55140 > Yarn detached mode via -yd does not detach > -- > > Key: FLINK-8872 > URL: https://issues.apache.org/jira/browse/FLINK-8872 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Running yarn per-job cluster in detached mode currently does not work and > waits for the job to finish. > Example: > {code} > ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input > {code} > Output in case of an infinite program would then end with something like this: > {code} > 2018-03-05 13:41:23,311 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for > the cluster to be allocated > 2018-03-05 13:41:23,313 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying > cluster, current state ACCEPTED > 2018-03-05 13:41:28,342 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN > application has been deployed successfully. > 2018-03-05 13:41:28,343 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink > YARN client has been started in detached mode. In order to stop Flink on > YARN, use the following command or a YARN web interface to stop it: > yarn application -kill application_1519984124671_0006 > Please also note that the temporary files of the YARN session in the home > directoy will not be removed. > Starting execution of program > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437682#comment-16437682 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181467478 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Can't we use `findThrowable(Throwable throwable, Class searchType)`? > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437679#comment-16437679 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181466728 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- I think we are mixing concerns by letting the `MiniClusterResource` change its behaviour across multiple tests. I think we should not pass in a configuration supplier which can lazily instantiate a configuration. Instead if you need a different `MiniClusterResource` per test, then the test should instantiate the respective `MiniClusterResource` with the proper configuration. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437680#comment-16437680 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181465966 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -225,6 +247,11 @@ private void startMiniCluster() throws Exception { } } + @Override + public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException { --- End diff -- I think we should use the `MiniClusterClient` for job submission. The `JobExecutor` interface is only an internal interface to bridge between the two different mini clusters. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437681#comment-16437681 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181467950 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- The same applies to rule chains. An easy solution would be to create a `MiniClusterWithTemporaryFolderResource` which encapsulates this logic. > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437683#comment-16437683 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181468061 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- Why do we need to implement `JobExecutor`? > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181467478 --- Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java --- @@ -196,21 +188,23 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient. PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); - TestStreamEnvironment.setAsContext( - testCluster, + TestEnvironment.setAsContext( + MINI_CLUSTER_RESOURCE, parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); - // Program should terminate with a 'SuccessException': - // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. - expectedException.expectCause( - Matchers.hasProperty("cause", - hasProperty("class", - hasProperty("canonicalName", equalTo( - "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"); - - streamingCheckpointedProg.invokeInteractiveModeForExecution(); + try { + streamingCheckpointedProg.invokeInteractiveModeForExecution(); + } catch (Exception e) { + // Program should terminate with a 'SuccessException': + // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar. + Optional expectedCause = ExceptionUtils.findThrowable(e, + candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")); --- End diff -- Can't we use `findThrowable(Throwable throwable, Class searchType)`? ---
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181465966 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -225,6 +247,11 @@ private void startMiniCluster() throws Exception { } } + @Override + public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException { --- End diff -- I think we should use the `MiniClusterClient` for job submission. The `JobExecutor` interface is only an internal interface to bridge between the two different mini clusters. ---
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181467950 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- The same applies to rule chains. An easy solution would be to create a `MiniClusterWithTemporaryFolderResource` which encapsulates this logic. ---
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181466728 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -117,6 +138,7 @@ public TestEnvironment getTestEnvironment() { @Override public void before() throws Exception { + miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfigurationSupplier.get()); --- End diff -- I think we are mixing concerns by letting the `MiniClusterResource` change its behaviour across multiple tests. I think we should not pass in a configuration supplier which can lazily instantiate a configuration. Instead if you need a different `MiniClusterResource` per test, then the test should instantiate the respective `MiniClusterResource` with the proper configuration. ---
[GitHub] flink pull request #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5780#discussion_r181468061 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -49,15 +54,17 @@ * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. */ -public class MiniClusterResource extends ExternalResource { +public class MiniClusterResource extends ExternalResource implements JobExecutor { --- End diff -- Why do we need to implement `JobExecutor`? ---
[jira] [Commented] (FLINK-8955) Port ClassLoaderITCase to flip6
[ https://issues.apache.org/jira/browse/FLINK-8955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437667#comment-16437667 ] ASF GitHub Bot commented on FLINK-8955: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5780 The `ClassLoaderITCase` fails on Travis. There are also checkstyle violations: ``` 10:04:01.988 [ERROR] src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[62,15] (imports) UnusedImports: Unused import: org.junit.Assert.assertEquals. 10:04:01.988 [ERROR] src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[65,15] (imports) UnusedImports: Unused import: org.junit.Assert.fail. ``` > Port ClassLoaderITCase to flip6 > --- > > Key: FLINK-8955 > URL: https://issues.apache.org/jira/browse/FLINK-8955 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5780: [FLINK-8955][tests] Port ClassLoaderITCase to flip6
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5780 The `ClassLoaderITCase` fails on Travis. There are also checkstyle violations: ``` 10:04:01.988 [ERROR] src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[62,15] (imports) UnusedImports: Unused import: org.junit.Assert.assertEquals. 10:04:01.988 [ERROR] src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java:[65,15] (imports) UnusedImports: Unused import: org.junit.Assert.fail. ``` ---
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437604#comment-16437604 ] Till Rohrmann commented on FLINK-9143: -- The problem is that we don't necessarily have the cluster configuration when we submit the job. Consequently, it is a priori not possible to decide whether the cluster has a default restart strategy or not. The underlying problem with the restart strategies is that the it is a cluster configuration as well as a job configuration option. I think it should only be a cluster configuration option. If we don't want break the behavior, then a solution could be to differentiate between an explicitly set job-specific restart strategy and the default job-specific restart strategy set when submitting a streaming job with checkpointing enabled. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437601#comment-16437601 ] ASF GitHub Bot commented on FLINK-9168: --- GitHub user XiaoZYang opened a pull request: https://github.com/apache/flink/pull/5845 [FLINK-9168][flink-connectors]Pulsar Sink connector ## What is the purpose of the change Provide a [pulsar](https://github.com/apache/incubator-pulsar) sink connector for flink. ## Brief change log - add `PulsarTableSink` - add `PulsarJsonTableSink` ## Verifying this change Added test that validates that target classses (i.e. PulsarTableSink and PulsarJsonTableSink) work as expected ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes ) upgrade `org.javassist` version to 3.20.0-GA - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (don't know) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (don't know) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/XiaoZYang/flink pulsar-connector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5845 commit fff36440197e3f34caf3af0b7ed7ba5320003e3b Author: Sijie GuoDate: 2018-03-02T05:08:53Z Add pulsar flink connector commit 447a998ce9d4de2dbb7b099b0568c5aed9f374bd Author: xiaozongyang Date: 2018-04-13T07:51:49Z 1.update pom.xml to follow flink version > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...
GitHub user XiaoZYang opened a pull request: https://github.com/apache/flink/pull/5845 [FLINK-9168][flink-connectors]Pulsar Sink connector ## What is the purpose of the change Provide a [pulsar](https://github.com/apache/incubator-pulsar) sink connector for flink. ## Brief change log - add `PulsarTableSink` - add `PulsarJsonTableSink` ## Verifying this change Added test that validates that target classses (i.e. PulsarTableSink and PulsarJsonTableSink) work as expected ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes ) upgrade `org.javassist` version to 3.20.0-GA - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (don't know) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (don't know) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/XiaoZYang/flink pulsar-connector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5845 commit fff36440197e3f34caf3af0b7ed7ba5320003e3b Author: Sijie GuoDate: 2018-03-02T05:08:53Z Add pulsar flink connector commit 447a998ce9d4de2dbb7b099b0568c5aed9f374bd Author: xiaozongyang Date: 2018-04-13T07:51:49Z 1.update pom.xml to follow flink version ---
[jira] [Created] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
Till Rohrmann created FLINK-9169: Summary: NPE when restoring from old savepoint and state serializer could not be deserialized Key: FLINK-9169 URL: https://issues.apache.org/jira/browse/FLINK-9169 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.2, 1.5.0 Reporter: Till Rohrmann A user reported to have observed the following exception when restoring a Flink job from a 1.3 savepoint with Flink 1.4. {code} 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 6fa6) switched from RUNNING to FAILED. java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB ackend.java:1216) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye dStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) ... 6 more {code} Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the {{stateSerializer}} can be {{null}}. This is not the problem, however, in {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} where we null check the state serializer. This will then fail with an indescriptive NPE. I think the same should happen when resuming with Flink 1.5 from a 1.4 savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437396#comment-16437396 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181414763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- This is virtually the same as `org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why do we have to duplicate it? ``` diff ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java 19c19 < package org.apache.flink.runtime.rest.handler.legacy.metrics; --- > package org.apache.flink.runtime.rest.handler.job.metrics; ``` > Port AbstractAggregatingMetricsHandler to RestServerEndpoint > > > Key: FLINK-8370 > URL: https://issues.apache.org/jira/browse/FLINK-8370 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Port subclasses of > {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}} > to new FLIP-6 {{RestServerEndpoint}}. > The following handlers need to be migrated: > * {{AggregatingJobsMetricsHandler}} > * {{AggregatingSubtasksMetricsHandler}} > * {{AggregatingTaskManagersMetricsHandler}} > New handlers should then be registered in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181414763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +/** + * An interface for accumulating double values. + */ +interface DoubleAccumulator { --- End diff -- This is virtually the same as `org.apache.flink.runtime.rest.handler.legacy.metrics.DoubleAccumulator`. Why do we have to duplicate it? ``` diff ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/DoubleAccumulator.java ./flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/DoubleAccumulator.java 19c19 < package org.apache.flink.runtime.rest.handler.legacy.metrics; --- > package org.apache.flink.runtime.rest.handler.job.metrics; ``` ---
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437392#comment-16437392 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181408771 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); --- End diff -- I think there is a potential NPE because `store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return `null`. > Port AbstractAggregatingMetricsHandler to RestServerEndpoint > > > Key: FLINK-8370 > URL: https://issues.apache.org/jira/browse/FLINK-8370 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437388#comment-16437388 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181401972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Locale; + +/** + * TODO: add javadoc. --- End diff -- Should be replaced. > Port AbstractAggregatingMetricsHandler to RestServerEndpoint > > > Key: FLINK-8370 > URL: https://issues.apache.org/jira/browse/FLINK-8370 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Port subclasses of > {{org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractAggregatingMetricsHandler}} > to new FLIP-6 {{RestServerEndpoint}}. > The following handlers need to be migrated: > * {{AggregatingJobsMetricsHandler}} > * {{AggregatingSubtasksMetricsHandler}} > * {{AggregatingTaskManagersMetricsHandler}} > New handlers should then be registered in {{WebMonitorEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437391#comment-16437391 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411248 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) {
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437389#comment-16437389 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler{ + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, +
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437393#comment-16437393 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411022 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) {
[jira] [Commented] (FLINK-8370) Port AbstractAggregatingMetricsHandler to RestServerEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437390#comment-16437390 ] ASF GitHub Bot commented on FLINK-8370: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler{ + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, +
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404252 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler{ + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411022 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; +
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181408771 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); --- End diff -- I think there is a potential NPE because `store.getTaskMetricStore(jobID.toString(), taskID.toString())` can return `null`. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181404370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java --- @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Abstract request handler for querying aggregated metrics. Subclasses return either a list of all available metrics + * or the aggregated values of them across all/selected entities. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * The "agg" query parameter is used to define which aggregates should be calculated. Available aggregations are + * "sum", "max", "min" and "avg". If the parameter is not specified, all aggregations will be returned. + * {@code /metrics?get=X,Y=min,max} + * The handler will then return a list of objects containing the aggregations for the requested metrics. + * {@code [ { "id" : "X", "min", "1", "max", "2" }, { "id" : "Y", "min", "4", "max", "10"}]} + */ +public abstract class AbstractAggregatingMetricsHandler> extends AbstractRestHandler{ + + private final Executor executor; + private final MetricFetcher fetcher; + + protected AbstractAggregatingMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + AbstractAggregatedMetricsHeaders messageHeaders, + Executor executor, + MetricFetcher
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181401972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsAggregationParameter.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Locale; + +/** + * TODO: add javadoc. --- End diff -- Should be replaced. ---
[GitHub] flink pull request #5805: [FLINK-8370][REST] Port AggregatingMetricsHandler ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5805#discussion_r181411248 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.SubtasksFilterQueryParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.UnionIterator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Request handler that returns, aggregated across subtasks, a list of all available metrics or the values + * for a set of metrics. + * + * Specific subtasks can be selected for aggregation by specifying a comma-separated list of integer ranges. + * {@code /metrics?get=X,Y=0-2,4-5} + */ +public class AggregatingSubtasksMetricsHandler extends AbstractAggregatingMetricsHandler { + + public AggregatingSubtasksMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MapresponseHeaders, + Executor executor, + MetricFetcher fetcher) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, AggregatedSubtaskMetricsHeaders.getInstance(), executor, fetcher); + } + + @Override + Collection getStores(MetricStore store, HandlerRequest request) { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class); + + Collection subtaskRanges = request.getQueryParameter(SubtasksFilterQueryParameter.class); + if (subtaskRanges.isEmpty()) { + return store.getTaskMetricStore(jobID.toString(), taskID.toString()).getAllSubtaskMetricStores(); + } else { + Iterable subtasks = getIntegerRangeFromString(subtaskRanges); + Collection subtaskStores = new ArrayList<>(8); + for (int subtask : subtasks) { + subtaskStores.add(store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask)); + } + return subtaskStores; + } + } + + private Iterable getIntegerRangeFromString(Collection ranges) { + UnionIterator iterators = new UnionIterator<>(); + + for (String rawRange : ranges) { + try { + Iterator rangeIterator; +
[jira] [Commented] (FLINK-9165) Improve CassandraSinkBase to send Collections
[ https://issues.apache.org/jira/browse/FLINK-9165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437261#comment-16437261 ] ASF GitHub Bot commented on FLINK-9165: --- Github user Bekreth closed the pull request at: https://github.com/apache/flink/pull/5844 > Improve CassandraSinkBase to send Collections > - > > Key: FLINK-9165 > URL: https://issues.apache.org/jira/browse/FLINK-9165 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.2 >Reporter: Christopher Hughes >Priority: Minor > Labels: easyfix, feature > > The CassandraSinkBase can currently only handle individual objects. I > propose overloading the `send(IN value)` method to include > `send(Collection value)`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5844: [FLINK-9165] [DataSink] Upgrading CassandraSinkBas...
Github user Bekreth closed the pull request at: https://github.com/apache/flink/pull/5844 ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 @juhoautio Your concerns for this fix is quite correct, and is why this PR was closed in the first place as there are a lot of ill-defined semantics introduced by this. Regarding your thought on relating idleness to empty results returned from Kafka: I think that seems like a good approach, and should also capture Eron's comment quite well. ---
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437175#comment-16437175 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 @juhoautio Your concerns for this fix is quite correct, and is why this PR was closed in the first place as there are a lot of ill-defined semantics introduced by this. Regarding your thought on relating idleness to empty results returned from Kafka: I think that seems like a good approach, and should also capture Eron's comment quite well. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437030#comment-16437030 ] Zongyang Xiao edited comment on FLINK-9168 at 4/13/18 9:01 AM: --- Related works will be done by [~si...@apache.org], [~zhaijia]and [~xiaozongyang_bupt] . was (Author: xiaozongyang_bupt): Related works will be done by [~si...@apache.org] and [~xiaozongyang_bupt] . > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Zongyang Xiao >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437030#comment-16437030 ] Xiao Zongyang commented on FLINK-9168: -- Related works will be done by [~si...@apache.org] and [~xiaozongyang_bupt] . > Pulsar Sink Connector > - > > Key: FLINK-9168 > URL: https://issues.apache.org/jira/browse/FLINK-9168 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Xiao Zongyang >Priority: Minor > Fix For: 1.6.0 > > > Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9168) Pulsar Sink Connector
Xiao Zongyang created FLINK-9168: Summary: Pulsar Sink Connector Key: FLINK-9168 URL: https://issues.apache.org/jira/browse/FLINK-9168 Project: Flink Issue Type: New Feature Components: Streaming Connectors Affects Versions: 1.6.0 Reporter: Xiao Zongyang Fix For: 1.6.0 Flink does not provide a sink connector for Pulsar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437021#comment-16437021 ] ASF GitHub Bot commented on FLINK-5479: --- Github user juhoautio commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai did you ever test your code? I tried it and it allowed watermarks to proceed but apparently too aggressively, as it caused a lot of data loss. I'm looking for a quick fix for this issue, as it seems that FLINK-5479 won't be fixed too soon. So I would very much like to hear if you have been able to fix this in some lighter way. My understanding of your PR is that it doesn't work reliably because it just seems to add an internal timeout, that could be surpassed whenever the consumer is for example busy consuming other partitions. Please comment if this perception is wrong. I'm thinking that it should instead get the information that a partition was idle from the kafka client, and only in that case (empty result from client) create a newer watermark for that partition. It shouldn't mark the partition to some idle state – and shouldn't create newer watermarks periodically without any connection to another empty result from the client. New watermarks should be only generated as a callback of the kafka client result..? > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user juhoautio commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai did you ever test your code? I tried it and it allowed watermarks to proceed but apparently too aggressively, as it caused a lot of data loss. I'm looking for a quick fix for this issue, as it seems that FLINK-5479 won't be fixed too soon. So I would very much like to hear if you have been able to fix this in some lighter way. My understanding of your PR is that it doesn't work reliably because it just seems to add an internal timeout, that could be surpassed whenever the consumer is for example busy consuming other partitions. Please comment if this perception is wrong. I'm thinking that it should instead get the information that a partition was idle from the kafka client, and only in that case (empty result from client) create a newer watermark for that partition. It shouldn't mark the partition to some idle state â and shouldn't create newer watermarks periodically without any connection to another empty result from the client. New watermarks should be only generated as a callback of the kafka client result..? ---
[jira] [Commented] (FLINK-6557) RocksDBKeyedStateBackend broken on Windows
[ https://issues.apache.org/jira/browse/FLINK-6557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436999#comment-16436999 ] ASF GitHub Bot commented on FLINK-6557: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3899 > RocksDBKeyedStateBackend broken on Windows > -- > > Key: FLINK-6557 > URL: https://issues.apache.org/jira/browse/FLINK-6557 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.3.0, 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > The {{RocksDBKeyedStateBackend}} cannot be used on Windows. We pass the > result of {{org.apache.flink.core.fs.Path#getPath()}} to RocksDB which will > fail when trying to create the checkpoint file as the path begins with a > slash which isn't a valid Windows path: > {code} > /C:/Users/Zento/AppData/Local/Temp/junit572330160893758355/junit5754599533651878867/job-ecbdb9df76fd3a39108dac7c515e3214_op-Test_uuid-6a43f1f6-1f35-443e-945c-aab3643e62fc/chk-0.tmp > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #3899: [FLINK-6557] Fix RocksDBKeyedStateBackend on Windo...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3899 ---
[jira] [Comment Edited] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
[ https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436997#comment-16436997 ] Chesnay Schepler edited comment on FLINK-8867 at 4/13/18 8:22 AM: -- FLINK-6557 may be relevant as it is also caused by passing Flink `Paths` to RocksDB. was (Author: zentol): FLINK-6557 may be relevant as it is also caused by passing Flink `Path` for local files to RocksDB. > Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config > > > Key: FLINK-8867 > URL: https://issues.apache.org/jira/browse/FLINK-8867 > Project: Flink > Issue Type: Bug > Components: Configuration, State Backends, Checkpointing, YARN >Affects Versions: 1.4.1, 1.4.2 >Reporter: Shashank Agarwal >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.5.0 > > > In our setup, when we put an entry in our Flink_conf file for default schema. > {code} > fs.default-scheme: hdfs://mydomain.com:8020/flink > {code} > Than application with rocksdb state backend fails with the following > exception. When we remove this config it works fine. It's working fine with > other state backends. > {code} > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 > for operator order ip stream (1/1).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator order ip stream (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8867) Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config
[ https://issues.apache.org/jira/browse/FLINK-8867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436997#comment-16436997 ] Chesnay Schepler commented on FLINK-8867: - FLINK-6557 may be relevant as it is also caused by passing Flink `Path` for local files to RocksDB. > Rocksdb checkpointing failing with fs.default-scheme: hdfs:// config > > > Key: FLINK-8867 > URL: https://issues.apache.org/jira/browse/FLINK-8867 > Project: Flink > Issue Type: Bug > Components: Configuration, State Backends, Checkpointing, YARN >Affects Versions: 1.4.1, 1.4.2 >Reporter: Shashank Agarwal >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.5.0 > > > In our setup, when we put an entry in our Flink_conf file for default schema. > {code} > fs.default-scheme: hdfs://mydomain.com:8020/flink > {code} > Than application with rocksdb state backend fails with the following > exception. When we remove this config it works fine. It's working fine with > other state backends. > {code} > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 > for operator order ip stream (1/1).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator order ip stream (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:926) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:386) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9165) Improve CassandraSinkBase to send Collections
[ https://issues.apache.org/jira/browse/FLINK-9165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16436994#comment-16436994 ] ASF GitHub Bot commented on FLINK-9165: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5844 Even then `invoke(Collection)` would never be called. `IN` is always the type of the `DataStream` the sink is applied on, and it can't be `Collection` as you would have an infinitely recursive type. You need to create a subclass that extends `CassandraSinkBase` which can then be applied to `DataStream `, and that you can already do. Note that it is rather unusual for the stream to contain collections, so I doubt we would be adding much value here. If instead you you want a sink of type `IN` that buffers elements and sends them in batches you can use the `CassandraWriteAheadSink`. Finally, do note that cassandra batches are a far cry from transactions. IIRC the guarantees work as such that either the batch completes successfully in which case all element are written, or it fails in which case _some_ records may have been written. > Improve CassandraSinkBase to send Collections > - > > Key: FLINK-9165 > URL: https://issues.apache.org/jira/browse/FLINK-9165 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Affects Versions: 1.4.2 >Reporter: Christopher Hughes >Priority: Minor > Labels: easyfix, feature > > The CassandraSinkBase can currently only handle individual objects. I > propose overloading the `send(IN value)` method to include > `send(Collection value)`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5844: [FLINK-9165] [DataSink] Upgrading CassandraSinkBase to su...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5844 Even then `invoke(Collection)` would never be called. `IN` is always the type of the `DataStream` the sink is applied on, and it can't be `Collection` as you would have an infinitely recursive type. You need to create a subclass that extends `CassandraSinkBase` which can then be applied to `DataStream `, and that you can already do. Note that it is rather unusual for the stream to contain collections, so I doubt we would be adding much value here. If instead you you want a sink of type `IN` that buffers elements and sends them in batches you can use the `CassandraWriteAheadSink`. Finally, do note that cassandra batches are a far cry from transactions. IIRC the guarantees work as such that either the batch completes successfully in which case all element are written, or it fails in which case _some_ records may have been written. ---
[jira] [Updated] (FLINK-9166) Performance issue with Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] SUBRAMANYA SURESH updated FLINK-9166: - Description: With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 60 ms" on a Yarn cluster. * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?). * The same works as standalone java program locally (high CPU initially) * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message. * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 145 TaskManagers with 5 slots each and parallelism of 725 (partitions in our Kafka source). *Query:* {code:java} select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source from structStream where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success' group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source {code} *Code:* {code:java} public static void main(String[] args) throws Exception { FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>()); final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment(); final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment); final DataStream structStream = getKafkaStreamOfRows(streamingEnvironment); tableEnv.registerDataStream("structStream", structStream); tableEnv.scan("structStream").printSchema(); for (int i = 0; i < 100; i++){ for (String query : Queries.sample){ // Queries.sample has one query that is above. Table selectQuery = tableEnv.sqlQuery(query); DataStream selectQueryStream = tableEnv.toAppendStream(selectQuery, Row.class); selectQueryStream.print(); } } // execute program streamingEnvironment.execute("Kafka Streaming SQL"); } private static DataStream getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception { Properties properties = getKafkaProperties(); // TestDeserializer deserializes the JSON to a ROW of string columns (515) // and also adds a column for the raw message. FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties); DataStream stream = environment.addSource(consumer); return stream; } private static RowTypeInfo getRowTypeInfo() throws Exception { // This has 515 fields. List fieldNames = DDIManager.getDDIFieldNames(); fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer fieldNames.add("proctime"); // Fill typeInformationArray with StringType to all but the last field which is of type Time . return new RowTypeInfo(typeInformationArray, fieldNamesArray); } private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(6); env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); env.setParallelism(725); return env; } {code} was: With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 60 ms" on a Yarn cluster. * JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?). * The same works as standalone java program locally (high CPU initially) * Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message. * In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka source). *Query:* {code:java} select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source from structStream where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success' group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source {code} *Code:* {code:java} public static void main(String[] args) throws Exception { FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(), new HashMap<>()); final StreamExecutionEnvironment