[jira] [Commented] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
[ https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319856#comment-16319856 ] ASF GitHub Bot commented on FLINK-8399: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5271 [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager ## What is the purpose of the change *This pull request separate the timeouts for slot request to task manager, slot request to be discarded and task manager to be released in slot manager to three different configurations.* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8399 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5271 commit f7024439ead5e3848c705659bfe221b8ce50f154 Author: shuai.xusDate: 2018-01-10T07:43:20Z [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager > Use independent configurations for the different timeouts in slot manager > - > > Key: FLINK-8399 > URL: https://issues.apache.org/jira/browse/FLINK-8399 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > There are three parameter in slot manager to indicate the timeout for slot > request to task manager, slot request to be discarded and task manager to be > released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, > need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5271 [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager ## What is the purpose of the change *This pull request separate the timeouts for slot request to task manager, slot request to be discarded and task manager to be released in slot manager to three different configurations.* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8399 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5271 commit f7024439ead5e3848c705659bfe221b8ce50f154 Author: shuai.xusDate: 2018-01-10T07:43:20Z [FLINK-8399] [runtime] use independent configurations for the different timeouts in slot manager ---
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319846#comment-16319846 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607315 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java --- @@ -27,6 +28,7 @@ * @deprecated Use {@link FlinkKafkaConsumer08} */ @Deprecated +@PublicEvolving --- End diff -- I don't think this annotation is needed here, since the class is deprecated already. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319852#comment-16319852 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607865 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java --- @@ -17,12 +17,15 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; +import org.apache.flink.annotation.Internal; + import java.io.Serializable; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records * across partitions of multiple Kafka topics. */ +@Internal --- End diff -- `FlinkKafkaPartitioner` should be `@PublicEvolving`. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319851#comment-16319851 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607372 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java --- @@ -31,6 +32,7 @@ * @deprecated Use {@link FlinkKafkaProducer08}. */ @Deprecated +@PublicEvolving --- End diff -- Same here. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319847#comment-16319847 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608176 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java --- @@ -37,6 +38,7 @@ * Result byte[] messages can be deserialized using * {@link JsonRowDeserializationSchema}. */ +@Internal --- End diff -- Same here. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319854#comment-16319854 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608074 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java --- @@ -37,6 +38,7 @@ * * Failure during deserialization are forwarded as wrapped IOExceptions. */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319850#comment-16319850 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608063 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java --- @@ -37,6 +38,7 @@ * Metadata fields can be accessed by calling objectNode.get("metadata").get(name>).as(type>) and include * the "offset" (long), "topic" (String) and "partition" (int). */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319853#comment-16319853 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608223 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java --- @@ -29,6 +30,7 @@ * * @param The type created by the keyed deserialization schema. */ +@Internal --- End diff -- `KeyedDeserializationSchema` should be `@PublicEvolving`. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319848#comment-16319848 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608269 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java --- @@ -26,6 +28,7 @@ * * @param The type to be serialized. */ +@Internal --- End diff -- `KeyedSerializationSchema` should be `@PublicEvolving`. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319844#comment-16319844 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607814 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -50,6 +51,7 @@ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers). */ +@Internal --- End diff -- Not sure here. This could be `@PublicEvolving`. It could very well be that the user simply instantiates a `FlinkFixedPartitioner` as the provided custom partitioner. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319845#comment-16319845 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608041 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java --- @@ -29,6 +30,7 @@ * * Fields can be accessed by calling objectNode.get(name>).as(type>) */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8276) Annotation for Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-8276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319849#comment-16319849 ] ASF GitHub Bot commented on FLINK-8276: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607359 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java --- @@ -27,6 +28,7 @@ * @deprecated Use {@link FlinkKafkaConsumer08} */ @Deprecated +@PublicEvolving --- End diff -- Same here. > Annotation for Kafka connector > -- > > Key: FLINK-8276 > URL: https://issues.apache.org/jira/browse/FLINK-8276 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.5.0 > > > See parent issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608176 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java --- @@ -37,6 +38,7 @@ * Result byte[] messages can be deserialized using * {@link JsonRowDeserializationSchema}. */ +@Internal --- End diff -- Same here. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607315 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java --- @@ -27,6 +28,7 @@ * @deprecated Use {@link FlinkKafkaConsumer08} */ @Deprecated +@PublicEvolving --- End diff -- I don't think this annotation is needed here, since the class is deprecated already. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608074 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java --- @@ -37,6 +38,7 @@ * * Failure during deserialization are forwarded as wrapped IOExceptions. */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608041 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java --- @@ -29,6 +30,7 @@ * * Fields can be accessed by calling objectNode.get(name>).as(type>) */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607865 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java --- @@ -17,12 +17,15 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; +import org.apache.flink.annotation.Internal; + import java.io.Serializable; /** * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records * across partitions of multiple Kafka topics. */ +@Internal --- End diff -- `FlinkKafkaPartitioner` should be `@PublicEvolving`. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607814 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java --- @@ -50,6 +51,7 @@ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will * cause a lot of network connections between all the Flink instances and all the Kafka brokers). */ +@Internal --- End diff -- Not sure here. This could be `@PublicEvolving`. It could very well be that the user simply instantiates a `FlinkFixedPartitioner` as the provided custom partitioner. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607372 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java --- @@ -31,6 +32,7 @@ * @deprecated Use {@link FlinkKafkaProducer08}. */ @Deprecated +@PublicEvolving --- End diff -- Same here. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608269 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java --- @@ -26,6 +28,7 @@ * * @param The type to be serialized. */ +@Internal --- End diff -- `KeyedSerializationSchema` should be `@PublicEvolving`. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160607359 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java --- @@ -27,6 +28,7 @@ * @deprecated Use {@link FlinkKafkaConsumer08} */ @Deprecated +@PublicEvolving --- End diff -- Same here. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608223 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java --- @@ -29,6 +30,7 @@ * * @param The type created by the keyed deserialization schema. */ +@Internal --- End diff -- `KeyedDeserializationSchema` should be `@PublicEvolving`. ---
[GitHub] flink pull request #5173: [FLINK-8276] [kafka connector] Properly annotate A...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5173#discussion_r160608063 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java --- @@ -37,6 +38,7 @@ * Metadata fields can be accessed by calling objectNode.get("metadata").get(name>).as(type>) and include * the "offset" (long), "topic" (String) and "partition" (int). */ +@Internal --- End diff -- This could also be `@PublicEvolving`, I don't think there's a problem with the user using it directly. ---
[jira] [Commented] (FLINK-8324) Expose another offsets metrics by using new metric API to specify user defined variables
[ https://issues.apache.org/jira/browse/FLINK-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319834#comment-16319834 ] ASF GitHub Bot commented on FLINK-8324: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5214 Good idea. I think we should also update the metrics doc to educate this addition. > Expose another offsets metrics by using new metric API to specify user > defined variables > > > Key: FLINK-8324 > URL: https://issues.apache.org/jira/browse/FLINK-8324 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei >Priority: Trivial > Fix For: 1.5.0 > > > The {{current-offsets}} and {{committed-offsets}} metrics are now attached > with topic name and partition id in the metric identity. > It is not convenient to use these metrics in Prometheus, because user usually > uses the same metric group name to group by those metrics which have the same > meaning and uses tags to get the individual metric. > For example, I will prefer to access {{current-offsets}} metric group and use > {{partition-x}} tag to get the offset of partition x, instead of getting > metric directly from {{current-offsets-partition-x}} metric. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5214: [FLINK-8324] [kafka] Expose another offsets metrics by us...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5214 Good idea. I think we should also update the metrics doc to educate this addition. ---
[jira] [Created] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
shuai.xu created FLINK-8399: --- Summary: Use independent configurations for the different timeouts in slot manager Key: FLINK-8399 URL: https://issues.apache.org/jira/browse/FLINK-8399 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
[ https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8399: Description: There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them. (was: There are three parameter in slot manager to indicate the timeout for slot request to task manager, slot request to be discarded and task manager to be released. But now the all come from the value of AkkaOptions.ASK_TIMEOUT, need to use independent configurations for them.) > Use independent configurations for the different timeouts in slot manager > - > > Key: FLINK-8399 > URL: https://issues.apache.org/jira/browse/FLINK-8399 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > There are three parameter in slot manager to indicate the timeout for slot > request to task manager, slot request to be discarded and task manager to be > released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, > need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition you mentioned: hmm, I can't seem to exactly nail down the "and only first `successfulCommits.inc()` can be omitted because of that" case you mentioned, could you elaborate on that a bit more? But yes, it seems like there certainly is a race condition here, and can even cause an NPE on: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496 Seems like the `nextOffsetsToCommit` and its callback instance should be bundled as a single `AtomicReference` here ... Would you like to open a PR for that? I wanted to ask because you discovered it in the first place; if not I'll open a fix for it. ---
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319816#comment-16319816 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition you mentioned: hmm, I can't seem to exactly nail down the "and only first `successfulCommits.inc()` can be omitted because of that" case you mentioned, could you elaborate on that a bit more? But yes, it seems like there certainly is a race condition here, and can even cause an NPE on: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496 Seems like the `nextOffsetsToCommit` and its callback instance should be bundled as a single `AtomicReference` here ... Would you like to open a PR for that? I wanted to ask because you discovered it in the first place; if not I'll open a fix for it. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319780#comment-16319780 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 @pnowojski thanks a lot for your insightful review! regarding the choice of composition or inheritance: actually, in the end I think we should be leaning towards neither of both, and let offset committing and record fetching live as separate components. I've left more detailed comments inline. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8368) Port SubtaskExecutionAttemptDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-8368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319778#comment-16319778 ] ASF GitHub Bot commented on FLINK-8368: --- GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5270 [FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a REST handler ## What is the purpose of the change * Migrate `org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler` to flip-6 `WebMonitorEndpoint`. ## Brief change log * Make some abstraction about `JobVertexHandler` and `SubtaskAttemptHandler`. * Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework. * Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, make it more reusable. ## Verifying this change * This change added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-8368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5270.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5270 commit 29182f04255f77288e207aef9e7015862d3e9a8c Author: biao.liubDate: 2018-01-10T06:25:07Z [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint > Port SubtaskExecutionAttemptDetailsHandler to new REST endpoint > --- > > Key: FLINK-8368 > URL: https://issues.apache.org/jira/browse/FLINK-8368 > Project: Flink > Issue Type: Sub-task > Components: REST >Reporter: Biao Liu >Assignee: Biao Liu > Labels: flip-6 > Fix For: 1.5.0 > > > Migrate > org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler > to new a REST handler that registered in WebMonitorEndpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 @pnowojski thanks a lot for your insightful review! regarding the choice of composition or inheritance: actually, in the end I think we should be leaning towards neither of both, and let offset committing and record fetching live as separate components. I've left more detailed comments inline. ---
[GitHub] flink pull request #5270: [FLINK-8368] [REST] Migrate SubtaskExecutionAttemp...
GitHub user ifndef-SleePy opened a pull request: https://github.com/apache/flink/pull/5270 [FLINK-8368] [REST] Migrate SubtaskExecutionAttemptDetailsHandler to new a REST handler ## What is the purpose of the change * Migrate `org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler` to flip-6 `WebMonitorEndpoint`. ## Brief change log * Make some abstraction about `JobVertexHandler` and `SubtaskAttemptHandler`. * Add `SubtaskExecutionAttemptDetailsHandler` in flip-6 REST framework. * Rename inner class `JobVertexMetrics` to public class `IOMetricsInfo`, make it more reusable. ## Verifying this change * This change added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink FLINK-8368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5270.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5270 commit 29182f04255f77288e207aef9e7015862d3e9a8c Author: biao.liubDate: 2018-01-10T06:25:07Z [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint ---
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319766#comment-16319766 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160599433 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- The result could also very well be that we should use this opportunity to refactor the vague dependencies between fetcher / consumer thread / consumer base, and include in this PR. I would not be against that. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160599433 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- The result could also very well be that we should use this opportunity to refactor the vague dependencies between fetcher / consumer thread / consumer base, and include in this PR. I would not be against that. ---
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319763#comment-16319763 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598939 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -89,30 +90,44 @@ @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), --- End diff -- Will do. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598939 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -89,30 +90,44 @@ @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), --- End diff -- Will do. ---
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319762#comment-16319762 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r16059 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -559,6 +565,7 @@ public void onException(Throwable cause) { //the fetchers 'snapshotCurrentState()' method return at least //the restored offsets this.kafkaFetcher = fetcher; + this.kafkaOffsetCommitter = createOffsetCommitter(); --- End diff -- This is a very valid argument. Will address this with a factory perhaps, as you suggested. > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8306) FlinkKafkaConsumerBaseTest has invalid mocks on final methods
[ https://issues.apache.org/jira/browse/FLINK-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319761#comment-16319761 ] ASF GitHub Bot commented on FLINK-8306: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598772 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- I agree that composition suits better here, or maybe even neither of both. However, the reality is that currently the offset committing logic is implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka client for both fetching records and committing offsets. Decoupling that would require further refactoring, which I think is a bit out of scope for the current issue at hand. I have been thinking that we should simply have two separate service implementations for offset committing and record fetching. If that happens, then neither composition or inheritance is required; offset committing and record fetching simply lives as two separate services. What do you think? > FlinkKafkaConsumerBaseTest has invalid mocks on final methods > - > > Key: FLINK-8306 > URL: https://issues.apache.org/jira/browse/FLINK-8306 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The {{FlinkKafkaConsumerBaseTest}} has invalid mocks on a final > {{AbstractFetcher::commitInternalOffsetsToKafka(...)}} method. While an easy > fix would be to simply make that method non-final, that is not ideal since it > would be best that the method is left final to prevent overrides in > subclasses. > This suggests that offset committing functionality is too tightly coupled > with the {{AbstractFetcher}}, making it hard to perform concise tests to > verify offset committing. > I suggest that we decouple record fetching and offset committing as separate > services behind different interfaces. We should introduce a new interface, > say {{KafkaOffsetCommitter}}, and test against that instead. Initially, we > can simply let {{AbstractFetcher}} implement {{KafkaOffsetCommitter}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r16059 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -559,6 +565,7 @@ public void onException(Throwable cause) { //the fetchers 'snapshotCurrentState()' method return at least //the restored offsets this.kafkaFetcher = fetcher; + this.kafkaOffsetCommitter = createOffsetCommitter(); --- End diff -- This is a very valid argument. Will address this with a factory perhaps, as you suggested. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598772 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- I agree that composition suits better here, or maybe even neither of both. However, the reality is that currently the offset committing logic is implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka client for both fetching records and committing offsets. Decoupling that would require further refactoring, which I think is a bit out of scope for the current issue at hand. I have been thinking that we should simply have two separate service implementations for offset committing and record fetching. If that happens, then neither composition or inheritance is required; offset committing and record fetching simply lives as two separate services. What do you think? ---
[jira] [Commented] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319754#comment-16319754 ] ASF GitHub Bot commented on FLINK-6004: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5269 [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip non-deserializable records ## What is the purpose of the change This PR is based on #5268, which includes fixes to harden Kinesis unit tests. Only the last commit is relevant. In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka records which cannot be deserialized. In reality pipelines, it is entirely normal that this could happen. This PR adds this functionality to the Flink Kinesis Consumer also. ## Brief change log - Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be returned if a message cannot be deserialized. - If `record` is `null` in `KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output for the record. - Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature. ## Verifying this change Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test verifies this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5269 commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) TaiDate: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T06:04:10Z [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records This commit acknowledges that null can be returned from the deserialization schema, if the message cannot be deserialized. If null is returned for a Kinesis record, no output is produced for that record, while the sequence number in the shard state is still advanced so that the record is effectively accounted as processed. > Allow FlinkKinesisConsumer to skip corrupted messages >
[GitHub] flink pull request #5269: [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer ...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5269 [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip non-deserializable records ## What is the purpose of the change This PR is based on #5268, which includes fixes to harden Kinesis unit tests. Only the last commit is relevant. In the past, we allowed the Flink Kafka Consumer to skip corrupted Kafka records which cannot be deserialized. In reality pipelines, it is entirely normal that this could happen. This PR adds this functionality to the Flink Kinesis Consumer also. ## Brief change log - Clarify in Javadoc of `KinesisDeserializationSchema` that `null` can be returned if a message cannot be deserialized. - If `record` is `null` in `KinesisDataFetcher::emitRecordAndUpdateState(...)`, do not collect any output for the record. - Add `KinesisDataFetcherTest::testSkipCorruptedRecord()` to verify feature. ## Verifying this change Additional `KinesisDataFetcherTest::testSkipCorruptedRecord()` test verifies this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6004 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5269 commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) TaiDate: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. commit 8d2b086ae7133999ec03620e3434cd659fd8d9d3 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T06:04:10Z [FLINK-6004] [kinesis] Allow FlinkKinesisConsumer to skip records This commit acknowledges that null can be returned from the deserialization schema, if the message cannot be deserialized. If null is returned for a Kinesis record, no output is produced for that record, while the sequence number in the shard state is still advanced so that the record is effectively accounted as processed. ---
[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319741#comment-16319741 ] ASF GitHub Bot commented on FLINK-8398: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5268 [FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests ## What is the purpose of the change Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on thread sleeps to wait until a certain operation occurs to allow the test to pass. This test behaviour is very flaky, and should be replaced with `OneShotLatch`. ## Brief change log - 94b4591: Several minor cleanups of confusing implementations / code smells in the `KinesisDataFetcherTest` and related test classes. The commit message explains what exactly was changed. - 547d19f: Remove thread sleeps in unit tests, and replace them with `OneShotLatch`. ## Verifying this change No test coverage should have been affected by this change. The existing tests in `KinesisDataFetcherTest` verifies this. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / *no*) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know) - The S3 file system connector: (yes / *no* / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / *no*) - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5268 commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) TaiDate: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. > Stabilize flaky KinesisDataFetcherTests > --- > > Key: FLINK-8398 > URL: https://issues.apache.org/jira/browse/FLINK-8398 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. > They rely on on thread sleeps to wait for a certain operation to happen, > which can easily miss and cause tests to fail. > Although there isn't any reports of consistent failures on these tests yet > (as far as I am aware of), they can easily surface in the future.
[GitHub] flink pull request #5268: [FLINK-8398] [kinesis, tests] Harden KinesisDataFe...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5268 [FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests ## What is the purpose of the change Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on thread sleeps to wait until a certain operation occurs to allow the test to pass. This test behaviour is very flaky, and should be replaced with `OneShotLatch`. ## Brief change log - 94b4591: Several minor cleanups of confusing implementations / code smells in the `KinesisDataFetcherTest` and related test classes. The commit message explains what exactly was changed. - 547d19f: Remove thread sleeps in unit tests, and replace them with `OneShotLatch`. ## Verifying this change No test coverage should have been affected by this change. The existing tests in `KinesisDataFetcherTest` verifies this. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / *no*) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know) - The S3 file system connector: (yes / *no* / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / *no*) - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5268 commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) TaiDate: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319691#comment-16319691 ] ASF GitHub Bot commented on FLINK-6951: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 Merging to `release-1.3` .. > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 Merging to `release-1.3` .. ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319690#comment-16319690 ] ASF GitHub Bot commented on FLINK-6951: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo thanks for the info! I'll merge this for `release-1.3` then, and will keep an extra eye on whether the problem still occurs for 1.4 / 1.5. > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo thanks for the info! I'll merge this for `release-1.3` then, and will keep an extra eye on whether the problem still occurs for 1.4 / 1.5. ---
[jira] [Created] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
Tzu-Li (Gordon) Tai created FLINK-8398: -- Summary: Stabilize flaky KinesisDataFetcherTests Key: FLINK-8398 URL: https://issues.apache.org/jira/browse/FLINK-8398 Project: Flink Issue Type: Bug Components: Kinesis Connector, Tests Affects Versions: 1.4.0, 1.5.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.5.0, 1.4.1 The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. They rely on on thread sleeps to wait for a certain operation to happen, which can easily miss and cause tests to fail. Although there isn't any reports of consistent failures on these tests yet (as far as I am aware of), they can easily surface in the future. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299208#comment-16299208 ] Ted Yu edited comment on FLINK-7795 at 1/10/18 4:08 AM: https://github.com/google/error-prone/releases/tag/v2.2.0 was the latest release. was (Author: yuzhih...@gmail.com): https://github.com/google/error-prone/releases/tag/v2.1.3 was the latest release. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. > Here are the dependencies: > {code} > > com.google.errorprone > error_prone_annotation > ${error-prone.version} > provided > > > > com.google.auto.service > auto-service > 1.0-rc3 > true > > > com.google.errorprone > error_prone_check_api > ${error-prone.version} > provided > > > com.google.code.findbugs > jsr305 > > > > > com.google.errorprone > javac > 9-dev-r4023-3 > provided > > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319632#comment-16319632 ] ASF GitHub Bot commented on FLINK-4816: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/4828 Hi @StephanEwen Let me conclude your comment and clarify some questions in my mind. 1. The original design treated all failures in DEPLOY as restore failure. That is not fair because it is just one of the reasons. 2. Using `last restored checkpoint ID` to record latest id is not a proper way. Maybe I need to put it in state object. Am I right? 3. A better solution might be tracking all failures in TaskManager, and only report those failure related to restore as restore failure. Then wrapping it with the current checkpoint id and send it back to JobManager. Do I misunderstand something? Or is there anything else that I didn't mentioned? > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Wei-Che Wei > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOY...
Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/4828 Hi @StephanEwen Let me conclude your comment and clarify some questions in my mind. 1. The original design treated all failures in DEPLOY as restore failure. That is not fair because it is just one of the reasons. 2. Using `last restored checkpoint ID` to record latest id is not a proper way. Maybe I need to put it in state object. Am I right? 3. A better solution might be tracking all failures in TaskManager, and only report those failure related to restore as restore failure. Then wrapping it with the current checkpoint id and send it back to JobManager. Do I misunderstand something? Or is there anything else that I didn't mentioned? ---
[jira] [Assigned] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-6004: -- Assignee: Tzu-Li (Gordon) Tai (was: ChungChe Lai) > Allow FlinkKinesisConsumer to skip corrupted messages > - > > Key: FLINK-6004 > URL: https://issues.apache.org/jira/browse/FLINK-6004 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > It is quite clear from the fix of FLINK-3679 that in reality, users might > encounter corrupted messages from Kafka / Kinesis / generally external > sources when deserializing them. > The consumers should support simply skipping those messages, by letting the > deserialization schema return {{null}}, and checking {{null}} values within > the consumer. > This has been done for the Kafka consumer already. This ticket tracks the > improvement for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6004) Allow FlinkKinesisConsumer to skip corrupted messages
[ https://issues.apache.org/jira/browse/FLINK-6004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319572#comment-16319572 ] Tzu-Li (Gordon) Tai commented on FLINK-6004: Since there's been quite a while since this issue was picked up, I assume that it is currently inactive. Will pick this up. > Allow FlinkKinesisConsumer to skip corrupted messages > - > > Key: FLINK-6004 > URL: https://issues.apache.org/jira/browse/FLINK-6004 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: ChungChe Lai > > It is quite clear from the fix of FLINK-3679 that in reality, users might > encounter corrupted messages from Kafka / Kinesis / generally external > sources when deserializing them. > The consumers should support simply skipping those messages, by letting the > deserialization schema return {{null}}, and checking {{null}} values within > the consumer. > This has been done for the Kafka consumer already. This ticket tracks the > improvement for the Kinesis consumer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei closed FLINK-5982. -- Resolution: Done Fix Version/s: 1.5.0 > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > Fix For: 1.5.0 > > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319532#comment-16319532 ] ASF GitHub Bot commented on FLINK-5982: --- Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Thanks a lot for doing these work. Nice to see this PR be merged. =) > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable and Sta...
Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3633 Thanks a lot for doing these work. Nice to see this PR be merged. =) ---
[jira] [Created] (FLINK-8397) Support ROW type in CassandraOutputFormat
Shuyi Chen created FLINK-8397: - Summary: Support ROW type in CassandraOutputFormat Key: FLINK-8397 URL: https://issues.apache.org/jira/browse/FLINK-8397 Project: Flink Issue Type: Improvement Environment: Currently, only tuple is supported. Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319445#comment-16319445 ] ASF GitHub Bot commented on FLINK-6951: --- Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/4150 Since EMR only supports Flink 1.3 I had to checkout release-1.3 and compile the connector from there. Then I was getting this `Socket not created by this factory` error. I then patched 1.3 to include these changes, and that fixed it. The current master version can't be used because so many things have changed since then, and it does not seem to be compatible with EMR right now. > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user casidiablo commented on the issue: https://github.com/apache/flink/pull/4150 Since EMR only supports Flink 1.3 I had to checkout release-1.3 and compile the connector from there. Then I was getting this `Socket not created by this factory` error. I then patched 1.3 to include these changes, and that fixed it. The current master version can't be used because so many things have changed since then, and it does not seem to be compatible with EMR right now. ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319441#comment-16319441 ] ASF GitHub Bot commented on FLINK-6951: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo did you mean that without applying this PR's patch, the current master worked for you? Or you had to apply this patch in order for it to work? > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4150 @casidiablo did you mean that without applying this PR's patch, the current master worked for you? Or you had to apply this patch in order for it to work? ---
[jira] [Updated] (FLINK-8396) Create (derived) duplicate Buffer class
[ https://issues.apache.org/jira/browse/FLINK-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8396: --- Description: In order to pass a single buffer to netty multiple times, we require a duplicate Buffer instance with a shared {{MemorySegment}} and reference counting but separate indices. This should be read-only. (was: In order to pass a single buffer to netty multiple times, we require a duplicate Buffer instance with a shared {{MemorySegment}} and reference counting but separate indices.) > Create (derived) duplicate Buffer class > --- > > Key: FLINK-8396 > URL: https://issues.apache.org/jira/browse/FLINK-8396 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to pass a single buffer to netty multiple times, we require a > duplicate Buffer instance with a shared {{MemorySegment}} and reference > counting but separate indices. This should be read-only. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8396) Create (derived) duplicate Buffer class
Nico Kruber created FLINK-8396: -- Summary: Create (derived) duplicate Buffer class Key: FLINK-8396 URL: https://issues.apache.org/jira/browse/FLINK-8396 Project: Flink Issue Type: Sub-task Components: Network Reporter: Nico Kruber Assignee: Nico Kruber In order to pass a single buffer to netty multiple times, we require a duplicate Buffer instance with a shared {{MemorySegment}} and reference counting but separate indices. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8395) Create (derived) sliced Buffer class
[ https://issues.apache.org/jira/browse/FLINK-8395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8395: --- Description: In order to pass sub-regions of a single buffer separately, we require a sliced Buffer instance with a shared {{MemorySegment}} and reference counting but separate indices. This should be read-only. (was: In order to pass sub-regions of a single buffer separately, we require a sliced Buffer instance.) > Create (derived) sliced Buffer class > > > Key: FLINK-8395 > URL: https://issues.apache.org/jira/browse/FLINK-8395 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to pass sub-regions of a single buffer separately, we require a > sliced Buffer instance with a shared {{MemorySegment}} and reference counting > but separate indices. This should be read-only. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8395) Create (derived) sliced Buffer class
Nico Kruber created FLINK-8395: -- Summary: Create (derived) sliced Buffer class Key: FLINK-8395 URL: https://issues.apache.org/jira/browse/FLINK-8395 Project: Flink Issue Type: Sub-task Components: Network Reporter: Nico Kruber Assignee: Nico Kruber In order to pass sub-regions of a single buffer separately, we require a sliced Buffer instance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2
[ https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319170#comment-16319170 ] ASF GitHub Bot commented on FLINK-7511: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4587#discussion_r160524740 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -150,16 +126,13 @@ */ private boolean nfaChanged; - public NFA( - final TypeSerializer eventSerializer, + public NFA(final TypeSerializer eventSerializer, final long windowTime, final boolean handleTimeout) { - this.eventSerializer = eventSerializer; --- End diff -- The difference in handling `null` seems strange to me. Could you tell me how did you check it? Anyway as I already said will add the check. > Remove dead code after dropping backward compatibility with <=1.2 > - > > Key: FLINK-7511 > URL: https://issues.apache.org/jira/browse/FLINK-7511 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4587#discussion_r160524740 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -150,16 +126,13 @@ */ private boolean nfaChanged; - public NFA( - final TypeSerializer eventSerializer, + public NFA(final TypeSerializer eventSerializer, final long windowTime, final boolean handleTimeout) { - this.eventSerializer = eventSerializer; --- End diff -- The difference in handling `null` seems strange to me. Could you tell me how did you check it? Anyway as I already said will add the check. ---
[jira] [Commented] (FLINK-7511) Remove dead code after dropping backward compatibility with <=1.2
[ https://issues.apache.org/jira/browse/FLINK-7511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319158#comment-16319158 ] ASF GitHub Bot commented on FLINK-7511: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4587#discussion_r160523676 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java --- @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.pattern; - -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * A filter function which combines two filter functions with a logical and. Thus, the filter - * function only returns true, iff both filters return true. - * - * @param Type of the element to filter - * @deprecated This is only used when migrating from an older Flink version. - * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead. - */ -@Deprecated -public class AndFilterFunction implements FilterFunction { --- End diff -- I am afraid your worries are justified :( I analyzed the code once again and indeed it is possible to have a checkpoint taken in 1.3.x that has serialized `*FilterFunction` classes through the `FilterWrapper` class. It is possible when the job was previously restored from 1.2.x checkpoint. Unfortunately I will need to restore those classes. I will do that tomorrow and I will also add test for that case. Thanks for catching that! > Remove dead code after dropping backward compatibility with <=1.2 > - > > Key: FLINK-7511 > URL: https://issues.apache.org/jira/browse/FLINK-7511 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.4.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4587: [FLINK-7511] [cep] Remove dead code after dropping...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4587#discussion_r160523676 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java --- @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.pattern; - -import org.apache.flink.api.common.functions.FilterFunction; - -/** - * A filter function which combines two filter functions with a logical and. Thus, the filter - * function only returns true, iff both filters return true. - * - * @param Type of the element to filter - * @deprecated This is only used when migrating from an older Flink version. - * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead. - */ -@Deprecated -public class AndFilterFunction implements FilterFunction { --- End diff -- I am afraid your worries are justified :( I analyzed the code once again and indeed it is possible to have a checkpoint taken in 1.3.x that has serialized `*FilterFunction` classes through the `FilterWrapper` class. It is possible when the job was previously restored from 1.2.x checkpoint. Unfortunately I will need to restore those classes. I will do that tomorrow and I will also add test for that case. Thanks for catching that! ---
[jira] [Updated] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
[ https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8394: -- Issue Type: Test (was: Bug) > Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown > --- > > Key: FLINK-8394 > URL: https://issues.apache.org/jira/browse/FLINK-8394 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > {code} > public void shutdown() { > running = false; > interrupt(); > expectedRecord.complete(0L); > {code} > Access to expectedRecord should be protected by synchronization, as done on > other methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
Ted Yu created FLINK-8394: - Summary: Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown Key: FLINK-8394 URL: https://issues.apache.org/jira/browse/FLINK-8394 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} public void shutdown() { running = false; interrupt(); expectedRecord.complete(0L); {code} Access to expectedRecord should be protected by synchronization, as done on other methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8393) Reconnect to last known JobMaster when connection is lost
[ https://issues.apache.org/jira/browse/FLINK-8393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319051#comment-16319051 ] ASF GitHub Bot commented on FLINK-8393: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5267 [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost ## What is the purpose of the change Reconnect to the last known location of a lost `JobMaster` connection. ## Brief change log - In case of a heartbeat timeout or a disconnect call, the `TaskExecutor` tries to reconnect to the last known `JobMaster` location ## Verifying this change - Added `RegisteredRpcConnection#testReconnect` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink resumeLostJobMasterConnection Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5267.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5267 commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8 Author: Till RohrmannDate: 2018-01-09T16:50:37Z [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. commit 21524394bc37372dc13eb5c3938de051cbd6f03e Author: Till Rohrmann Date: 2018-01-08T17:23:27Z [FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor This commit introduces the JobExecutor interface which abstracts the actual mini cluster from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well as the FlinkMiniCluster implement this interface, we can run all test base jobs either on the Flip-6 mini cluster or on the current mini cluster. This closes #4897. commit 5f1bdc1a8546e24e079753f92f22a397ccba24de Author: Till Rohrmann Date: 2017-12-01T14:02:09Z [FLINK-8389] [flip6] Release all slots upon closing of JobManager connection commit 141f21d85f0047e4e5c0776e70ac6d83a03e5943 Author: Till Rohrmann Date: 2018-01-09T13:11:20Z [hotfix] Add retrieval of key sets to DualKeyMap commit d46ba9c5f6c1320d73b4d0e65462bcf2c45ff28f Author: Till Rohrmann Date: 2017-12-01T14:10:46Z [hotfix] Enable checkpointing RPC calls commit 737676e5912a2b43dd195ab2a940bc15af6630fb Author: Till Rohrmann Date: 2018-01-09T08:28:34Z [hotfix] Add JavaDocs to OnCompletionActions commit f456248a2e75da4947cea7f2d863db129f0efc5f Author: Till Rohrmann Date: 2018-01-09T15:44:59Z [hotfix] Refactor JobMasterTest to avoid using Mockito commit fa7b667a196980e05194b88ba352a12f330b5ad0 Author: Till Rohrmann Date: 2018-01-09T19:37:08Z [FLINK-8393] Reconnect to last known JobMaster when connection is lost > Reconnect to last known JobMaster when connection is lost > - > > Key: FLINK-8393 > URL: https://issues.apache.org/jira/browse/FLINK-8393 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In case of a connection loss to the {{JobMaster}}, e.g. due to a heartbeat > timeout or a disconnect call, then the {{TaskExecutor}} should try to > reconnect to the last known {{JobMaster}} location in case that the timeout > was a false positive. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5267: [FLINK-8393] [flip6] Reconnect to last known JobMa...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5267 [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost ## What is the purpose of the change Reconnect to the last known location of a lost `JobMaster` connection. ## Brief change log - In case of a heartbeat timeout or a disconnect call, the `TaskExecutor` tries to reconnect to the last known `JobMaster` location ## Verifying this change - Added `RegisteredRpcConnection#testReconnect` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink resumeLostJobMasterConnection Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5267.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5267 commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8 Author: Till RohrmannDate: 2018-01-09T16:50:37Z [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. commit 21524394bc37372dc13eb5c3938de051cbd6f03e Author: Till Rohrmann Date: 2018-01-08T17:23:27Z [FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor This commit introduces the JobExecutor interface which abstracts the actual mini cluster from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well as the FlinkMiniCluster implement this interface, we can run all test base jobs either on the Flip-6 mini cluster or on the current mini cluster. This closes #4897. commit 5f1bdc1a8546e24e079753f92f22a397ccba24de Author: Till Rohrmann Date: 2017-12-01T14:02:09Z [FLINK-8389] [flip6] Release all slots upon closing of JobManager connection commit 141f21d85f0047e4e5c0776e70ac6d83a03e5943 Author: Till Rohrmann Date: 2018-01-09T13:11:20Z [hotfix] Add retrieval of key sets to DualKeyMap commit d46ba9c5f6c1320d73b4d0e65462bcf2c45ff28f Author: Till Rohrmann Date: 2017-12-01T14:10:46Z [hotfix] Enable checkpointing RPC calls commit 737676e5912a2b43dd195ab2a940bc15af6630fb Author: Till Rohrmann Date: 2018-01-09T08:28:34Z [hotfix] Add JavaDocs to OnCompletionActions commit f456248a2e75da4947cea7f2d863db129f0efc5f Author: Till Rohrmann Date: 2018-01-09T15:44:59Z [hotfix] Refactor JobMasterTest to avoid using Mockito commit fa7b667a196980e05194b88ba352a12f330b5ad0 Author: Till Rohrmann Date: 2018-01-09T19:37:08Z [FLINK-8393] Reconnect to last known JobMaster when connection is lost ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319049#comment-16319049 ] ASF GitHub Bot commented on FLINK-8360: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5239 Thanks for going through the general design @StephanEwen ! As we discussed, I agree with your first point. For the second point about RocksDB, this PR already contains an optimized way to deal with incremental local checkpoints that we did not discuss in our review, because I thought it is too much of a low level detail. It does not work with duplicating streams. Instead, I introduced a state handle type for a local directory. In fact, I mapped the previous incremental recovery from DFS state also to this new handle type: dfs state is first downloaded and then it also simply becomes a local directory state handle. From there, both incremental recovery paths are identical. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5239: [FLINK-8360] Implement task-local state recovery
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5239 Thanks for going through the general design @StephanEwen ! As we discussed, I agree with your first point. For the second point about RocksDB, this PR already contains an optimized way to deal with incremental local checkpoints that we did not discuss in our review, because I thought it is too much of a low level detail. It does not work with duplicating streams. Instead, I introduced a state handle type for a local directory. In fact, I mapped the previous incremental recovery from DFS state also to this new handle type: dfs state is first downloaded and then it also simply becomes a local directory state handle. From there, both incremental recovery paths are identical. ---
[jira] [Created] (FLINK-8393) Reconnect to last known JobMaster when connection is lost
Till Rohrmann created FLINK-8393: Summary: Reconnect to last known JobMaster when connection is lost Key: FLINK-8393 URL: https://issues.apache.org/jira/browse/FLINK-8393 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 In case of a connection loss to the {{JobMaster}}, e.g. due to a heartbeat timeout or a disconnect call, then the {{TaskExecutor}} should try to reconnect to the last known {{JobMaster}} location in case that the timeout was a false positive. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-6951. --- Resolution: Invalid Not a problem anymore > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319021#comment-16319021 ] ASF GitHub Bot commented on FLINK-6951: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4150 Thanks for confirming. I'll close this ticket and PR > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4150 Thanks for confirming. I'll close this ticket and PR ---
[jira] [Closed] (FLINK-8284) Custom metrics not being exposed for Prometheus
[ https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Biason closed FLINK-8284. --- Resolution: Invalid > Custom metrics not being exposed for Prometheus > --- > > Key: FLINK-8284 > URL: https://issues.apache.org/jira/browse/FLINK-8284 > Project: Flink > Issue Type: Bug > Components: Documentation, Metrics >Affects Versions: 1.4.0 > Environment: Linux/CentOS 7 >Reporter: Julio Biason > > Following the documentation, we changed our filter that removes events with > missing fields to a RichFilterFunction, so we can capture metrics about such > events: > {code:scala} > public class MissingClientFilter extends RichFilterFunction { > private transient Counter counter; > @Override > public void open(Configuration config) { > this.counter = getRuntimeContext() > .getMetricGroup() > .addGroup("events") > .counter("missingClient"); > } > @Override > public boolean filter(LineData line) { > String client = line.get("client").toString(); > boolean missing = client.trim().equals(""); > if (!missing) { > this.count(); > } > return !missing; > } > private void count() { > if (this.counter != null) { > this.counter.inc(); > } > } > } > {code} > We also added Prometheus as our reporter: > {noformat} > metrics.reporters: prom > metrics.reporter.prom.port: 9105 > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > {noformat} > The problem is accessing port 9105 display all Flink metrics, but not ours. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8284) Custom metrics not being exposed for Prometheus
[ https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318999#comment-16318999 ] Julio Biason commented on FLINK-8284: - I swear to ${deity} that it didn't work. Restarted the project to check logs and suddenly the metrics are there. Closing this as invalid. Will reopen if I find any reason for it not working in the first place. > Custom metrics not being exposed for Prometheus > --- > > Key: FLINK-8284 > URL: https://issues.apache.org/jira/browse/FLINK-8284 > Project: Flink > Issue Type: Bug > Components: Documentation, Metrics >Affects Versions: 1.4.0 > Environment: Linux/CentOS 7 >Reporter: Julio Biason > > Following the documentation, we changed our filter that removes events with > missing fields to a RichFilterFunction, so we can capture metrics about such > events: > {code:scala} > public class MissingClientFilter extends RichFilterFunction { > private transient Counter counter; > @Override > public void open(Configuration config) { > this.counter = getRuntimeContext() > .getMetricGroup() > .addGroup("events") > .counter("missingClient"); > } > @Override > public boolean filter(LineData line) { > String client = line.get("client").toString(); > boolean missing = client.trim().equals(""); > if (!missing) { > this.count(); > } > return !missing; > } > private void count() { > if (this.counter != null) { > this.counter.inc(); > } > } > } > {code} > We also added Prometheus as our reporter: > {noformat} > metrics.reporters: prom > metrics.reporter.prom.port: 9105 > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > {noformat} > The problem is accessing port 9105 display all Flink metrics, but not ours. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318974#comment-16318974 ] ASF GitHub Bot commented on FLINK-8360: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5239 I did a peer review and walk through the code. Overall, the design look good, +1 for that! Some comments: - I would argue to change the way that these recovery options are configured. Currently, this goes through methods on one state backend objects, i.e., *configuration in code*. Because that recovery aspect is a really an "ops-related" aspect of running a Flink job (or a broader streaming platform), it should not be configured in code, but in the config. I found it helpful to thing that settings in code are for what concerns the application developers, settings in the config for what concerns the people that run Flink. They may be the same person in the end, but even then it is helpful because they are frequently are in different stages of the application development and deployment. Configurations are more easy to "standardize on", like "we want all applications in that group to enable local recovery". - One thing I am not yet 100% sure of is how this will interact in the future with RocksDB's optimized local recovery. I assume that checkpoints will in the future always use incremental snapshots. For such, there is no stream of bytes to store locally in addition. The files are already local and immutable. Here, the RocksDB snapshot should probably directly go through the local recovery directory, and the diff files would be persisted from there (the complete snapshot, which consists only of hardlinks to the files that are also in the work directory, would be retained though). Is the assumption that this is a "retain data structure" style mechanism, bespoke for each state backend, similar as retaining the heap copy-on-write table for the Heap State Backend? Now, since this PR is already complicated and needs a heavy rebase, I would be okay with doing that in another PR, if there is commitment to do this soon (before the 1.5 release branch is cut). Slightly off topic: This code has a very distinct style of using many `@Nonnull` annotations. Other newer parts of the code (the once that use annotations) follow the contract "non-null unless annotated with `@Nullable`". I don't ask to change this. Would be good to actually have a discussion and come up with a recommended style to agree on for the future. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5239: [FLINK-8360] Implement task-local state recovery
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5239 I did a peer review and walk through the code. Overall, the design look good, +1 for that! Some comments: - I would argue to change the way that these recovery options are configured. Currently, this goes through methods on one state backend objects, i.e., *configuration in code*. Because that recovery aspect is a really an "ops-related" aspect of running a Flink job (or a broader streaming platform), it should not be configured in code, but in the config. I found it helpful to thing that settings in code are for what concerns the application developers, settings in the config for what concerns the people that run Flink. They may be the same person in the end, but even then it is helpful because they are frequently are in different stages of the application development and deployment. Configurations are more easy to "standardize on", like "we want all applications in that group to enable local recovery". - One thing I am not yet 100% sure of is how this will interact in the future with RocksDB's optimized local recovery. I assume that checkpoints will in the future always use incremental snapshots. For such, there is no stream of bytes to store locally in addition. The files are already local and immutable. Here, the RocksDB snapshot should probably directly go through the local recovery directory, and the diff files would be persisted from there (the complete snapshot, which consists only of hardlinks to the files that are also in the work directory, would be retained though). Is the assumption that this is a "retain data structure" style mechanism, bespoke for each state backend, similar as retaining the heap copy-on-write table for the Heap State Backend? Now, since this PR is already complicated and needs a heavy rebase, I would be okay with doing that in another PR, if there is commitment to do this soon (before the 1.5 release branch is cut). Slightly off topic: This code has a very distinct style of using many `@Nonnull` annotations. Other newer parts of the code (the once that use annotations) follow the contract "non-null unless annotated with `@Nullable`". I don't ask to change this. Would be good to actually have a discussion and come up with a recommended style to agree on for the future. ---
[jira] [Commented] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318913#comment-16318913 ] ASF GitHub Bot commented on FLINK-8360: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r160492829 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java --- @@ -43,9 +44,9 @@ CheckpointStateOutputStream createCheckpointStateOutputStream( * Closes the stream factory, releasing all internal resources, but does not delete any * persistent checkpoint data. * -* @throws Exception Exceptions can be forwarded and will be logged by the system +* @throws IOException Exceptions can be forwarded and will be logged by the system */ - void close() throws Exception; + void close() throws IOException; --- End diff -- I think this method is actually removed in the latest master. > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r160492829 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java --- @@ -43,9 +44,9 @@ CheckpointStateOutputStream createCheckpointStateOutputStream( * Closes the stream factory, releasing all internal resources, but does not delete any * persistent checkpoint data. * -* @throws Exception Exceptions can be forwarded and will be logged by the system +* @throws IOException Exceptions can be forwarded and will be logged by the system */ - void close() throws Exception; + void close() throws IOException; --- End diff -- I think this method is actually removed in the latest master. ---
[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4
[ https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318906#comment-16318906 ] ASF GitHub Bot commented on FLINK-8082: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5262 Fails only with the now fixed "duplicate entry" error in `flink-mesos`, so looks like this is fine. > Bump version compatibility check to 1.4 > --- > > Key: FLINK-8082 > URL: https://issues.apache.org/jira/browse/FLINK-8082 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.5.0 > > > Similar to FLINK-7977, we must bump the version of the compatibility check to > compare 1.5 against 1.4, once it is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5262: [FLINK-8082][build] Bump flink version for japicmp plugin
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5262 Fails only with the now fixed "duplicate entry" error in `flink-mesos`, so looks like this is fine. ---
[jira] [Commented] (FLINK-5982) Refactor AbstractInvokable and StatefulTask
[ https://issues.apache.org/jira/browse/FLINK-5982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318879#comment-16318879 ] ASF GitHub Bot commented on FLINK-5982: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3633 > Refactor AbstractInvokable and StatefulTask > --- > > Key: FLINK-5982 > URL: https://issues.apache.org/jira/browse/FLINK-5982 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Wei-Che Wei >Assignee: Wei-Che Wei > > Currently, running a invokable in {{Task}} needs to call > {{setEnvironment(env)}} and {{invoke()}}. If the invokable is also a > {{StatefulTask}}, it need to call {{setInitialState(state)}}. That makes the > difficulty in doing the eager initialization on invokable during > {{DEPLOYING}} state. One solution discussed in FLINK-4714 is to separate > {{invoke()}} into {{open()}} and {{invoke()}}, but that makes the complexity > for running it in {{Task}}. > This task wants to refactor {{AbstractInvokable}} and {{StatefulTask}} to > make it easier to construct and run an invokable. > # Refactor abstract class to have one default constructor. > #* Drop {{StatefulTask}} and assume all subclasses of {{AbstractInvokable}} > are stateful. > #* Remove {{setEnvironment(env)}} and {{setInitialState(state)}}. Make > {{AbstractInvokable}} have a two argument constructor with {{Environment}} > and {{TaskStateHandles}}. > # Update all subclass > #* Make all subclass of {{AbstractInvokable}} have a two argument constructor > and call the constructor in {{AbstractInvokable}}. > #* Throw an error in {{BatchTask}} if the initial state is not null. (This > will be removed after {{BatchTask}} have been stateful.) > # Change the creation of the invokable to call that constructor, update all > the tests. > Then, we can simplify the logic to run an invokable by using constructor and > {{invoke()}}. The eager initialization can easily be placed in the > constructor to fulfill the requirement such as FLINK-4714. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3633: [FLINK-5982] [runtime] Refactor AbstractInvokable ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3633 ---
[jira] [Commented] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components
[ https://issues.apache.org/jira/browse/FLINK-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318834#comment-16318834 ] Chesnay Schepler commented on FLINK-8350: - I'm not so sure about making {{taskmanager.tmp.dirs}} a deprecated key that is shared across all components. That would mean that an outdated configurations may suddenly affect the jobmanager. > replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components > > > Key: FLINK-8350 > URL: https://issues.apache.org/jira/browse/FLINK-8350 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Configuration >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter > which (if unset) is set to YARN/Mesos' application environment paths (the > latter not quite yet). With FLINK-8279, we also used this as a fall-back for > the BLOB caches and would like to use it for the BLOB server as well. This, > however, does not reside on the TaskManager and it only makes sense to have a > single temporary directory configuration parameter (if desired, this could be > extended). > I propose to change this to a more generic {{env.io.tmp.dirs}} used by all > components, i.e. JobManager, JobMaster, Dispatcher, and all the > TaskManager-related instances for both YARN and Mesos. > > TODO: set this value to the appropriate folders for the JobManager code > paths > during cluster deployment (this exists for the TaskManager only for now) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318827#comment-16318827 ] ASF GitHub Bot commented on FLINK-4816: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4828 I think this approach is not yet sufficient. There can be various reasons why a failure in DEPLOY happens, failed checkpoint restore is only one of the reasons. This also adds some coupling of execution graph state and checkpoint coordinator (last restored checkpoint ID) which breaks design and responsibilities. A proper solution here is probably a bit more comprehensive - and need a bit more thinking, probably a bigger design document. my first though would be to report a proper RestoreException from the TaskManager, keeping a history of exceptions that triggered recovery, using that to evaluate fallback, etc. > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Assignee: Wei-Che Wei > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOY...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4828 I think this approach is not yet sufficient. There can be various reasons why a failure in DEPLOY happens, failed checkpoint restore is only one of the reasons. This also adds some coupling of execution graph state and checkpoint coordinator (last restored checkpoint ID) which breaks design and responsibilities. A proper solution here is probably a bit more comprehensive - and need a bit more thinking, probably a bigger design document. my first though would be to report a proper RestoreException from the TaskManager, keeping a history of exceptions that triggered recovery, using that to evaluate fallback, etc. ---
[jira] [Commented] (FLINK-8350) replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components
[ https://issues.apache.org/jira/browse/FLINK-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318819#comment-16318819 ] Stephan Ewen commented on FLINK-8350: - {{io.tmp.dirs}} is fine with me. Please keep the {{taskmanager.tmp.dirs}} as a deprecated key for that config option. > replace "taskmanager.tmp.dirs" with "env.io.tmp.dirs" for all components > > > Key: FLINK-8350 > URL: https://issues.apache.org/jira/browse/FLINK-8350 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Configuration >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, there is only a {{taskmanager.tmp.dirs}} configuration parameter > which (if unset) is set to YARN/Mesos' application environment paths (the > latter not quite yet). With FLINK-8279, we also used this as a fall-back for > the BLOB caches and would like to use it for the BLOB server as well. This, > however, does not reside on the TaskManager and it only makes sense to have a > single temporary directory configuration parameter (if desired, this could be > extended). > I propose to change this to a more generic {{env.io.tmp.dirs}} used by all > components, i.e. JobManager, JobMaster, Dispatcher, and all the > TaskManager-related instances for both YARN and Mesos. > > TODO: set this value to the appropriate folders for the JobManager code > paths > during cluster deployment (this exists for the TaskManager only for now) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7475) support update() in ListState
[ https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318760#comment-16318760 ] ASF GitHub Bot commented on FLINK-7475: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4963 @StefanRRichter Thanks! I will > support update() in ListState > - > > Key: FLINK-7475 > URL: https://issues.apache.org/jira/browse/FLINK-7475 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: yf >Assignee: Bowen Li > Fix For: 1.5.0 > > > If I want to update the list. > I have to do two steps: > listState.clear() > for (Element e : myList) { > listState.add(e); > } > Why not I update the state by: > listState.update(myList) ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4963: [FLINK-7475] [core][DataStream API] support update() in L...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4963 @StefanRRichter Thanks! I will ---
[jira] [Commented] (FLINK-8392) Simplify termination future completion
[ https://issues.apache.org/jira/browse/FLINK-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318754#comment-16318754 ] ASF GitHub Bot commented on FLINK-8392: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5266 [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop ## What is the purpose of the change Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. This enables that we can wait in the `RpcEndpoint#postStop` method on the termination of logically nested `RpcEndpoint`. ## Verifying this change - Covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink simplifyRpcTerminationFuture Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5266 commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8 Author: Till RohrmannDate: 2018-01-09T16:50:37Z [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. > Simplify termination future completion > -- > > Key: FLINK-8392 > URL: https://issues.apache.org/jira/browse/FLINK-8392 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > With FLINK-7754, we tried to complete the termination future after an > {{Actor}} has been completely stopped and has been removed from the > {{ActorSystem}}. This, however, is not possible. Furthermore, this change > made it impossible that a {{RpcEndpoint}} waits for the termination of > another {{RpcEndpoint}} in its {{RpcEndpoint#postStop}} method. Therefore, I > propose to revert the changes done by FLINK-7754. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5266: [FLINK-8392] [rpc] Let termination future be compl...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5266 [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop ## What is the purpose of the change Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. This enables that we can wait in the `RpcEndpoint#postStop` method on the termination of logically nested `RpcEndpoint`. ## Verifying this change - Covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink simplifyRpcTerminationFuture Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5266 commit f9bccf20b046e4f73a52ca2a4b842ca985dfaaa8 Author: Till RohrmannDate: 2018-01-09T16:50:37Z [FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now completed from the AkkaRpcActor#postStop method. ---