[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery
Github user sihuazhou closed the pull request at: https://github.com/apache/flink/pull/5074 ---
[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379918#comment-16379918 ] ASF GitHub Bot commented on FLINK-7873: --- Github user sihuazhou closed the pull request at: https://github.com/apache/flink/pull/5074 > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-7873. - Resolution: Duplicate Release Note: fixed by stefan. > Introduce CheckpointCacheManager for reading checkpoint data locally when > performing failover > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > Why i introduce this: > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwith, and obtain a faster > recover. > Solution: > TaskManager do the cache job and manage the cached data itself. It simple > use a TTL-like method to manage cache entry's dispose, we dispose a entry if > it wasn't be touched for a X time, once we touch a entry we reset the TTL for > it. In this way, all jobs is done by TaskManager, it transparent to > JobManager. The only problem is that we may dispose a entry that maybe > useful, in this case, we have to read from remote data finally, but users can > avoid this by set a proper TTL value according to checkpoint interval and > other things. > Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8160) Extend OperatorHarness to expose metrics
[ https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tuo Wang reassigned FLINK-8160: --- Assignee: Tuo Wang > Extend OperatorHarness to expose metrics > > > Key: FLINK-8160 > URL: https://issues.apache.org/jira/browse/FLINK-8160 > Project: Flink > Issue Type: Improvement > Components: Metrics, Streaming >Reporter: Chesnay Schepler >Assignee: Tuo Wang >Priority: Major > Fix For: 1.5.0 > > > To better test interactions between operators and metrics the harness should > expose the metrics registered by the operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Description: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. was: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 _part-28-0.in-progress 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yanxiaobin updated FLINK-8794: -- Description: When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. {code:java} // code placeholder {code} 2018-02-28 11:58:42 147341619 _part-28-0.in-progress 2018-02-28 12:06:27 147315059 part-0-0 2018-02-28 12:06:27 147462359 part-1-0 2018-02-28 12:06:27 147316006 part-10-0 2018-02-28 12:06:28 147349854 part-100-0 2018-02-28 12:06:27 147421625 part-101-0 2018-02-28 12:06:27 147443830 part-102-0 2018-02-28 12:06:27 147372801 part-103-0 2018-02-28 12:06:27 147343670 part-104-0 .. was:When using BucketingSink, it happens that one of the files is always in the [.in-progress] state. And this state has never changed after that. The underlying use of S3 as storage. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 _part-28-0.in-progress > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815 ] Thomas Weise edited comment on FLINK-5697 at 2/28/18 5:52 AM: -- We have implemented periodic watermark support in a customization of FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance of AssignerWithPeriodicWatermarks and uses the configuration from ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also addresses the issue described in FLINK-5479 with an (optional) interval property for the user to specify after how much time since the last record a shard is considered idle and should not hold back the watermark. If there is interest, I would contribute these changes to the current Flink Kinesis connector. was (Author: thw): We have implemented periodic watermark support in a customization of FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance of AssignerWithPeriodicWatermarks and uses the configuration from ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also addresses the issue described in FLINK-5479 with an (optional) interval property for the user to specify after how much time since the last a shard is considered idle and should not hold back the watermark. If there is interest, I would contribute these changes to the current Flink Kinesis connector. > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815 ] Thomas Weise commented on FLINK-5697: - We have implemented periodic watermark support in a customization of FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance of AssignerWithPeriodicWatermarks and uses the configuration from ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also addresses the issue described in FLINK-5479 with an (optional) interval property for the user to specify after how much time since the last a shard is considered idle and should not hold back the watermark. If there is interest, I would contribute these changes to the current Flink Kinesis connector. > Add per-shard watermarks for FlinkKinesisConsumer > - > > Key: FLINK-5697 > URL: https://issues.apache.org/jira/browse/FLINK-5697 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > It would be nice to let the Kinesis consumer be on-par in functionality with > the Kafka consumer, since they share very similar abstractions. Per-partition > / shard watermarks is something we can add also to the Kinesis consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5578 @StefanRRichter I have addressed your suggestions, except the one that to make the `if` a bit complex, instead I introduced a `Predicate` for the `pruneCheckpoints()`. I not sure whether it is ok to you, if you still against doing so, I'd like to change the code as to make the `if` a bit complex. ---
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379806#comment-16379806 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5578 @StefanRRichter I have addressed your suggestions, except the one that to make the `if` a bit complex, instead I introduced a `Predicate` for the `pruneCheckpoints()`. I not sure whether it is ok to you, if you still against doing so, I'd like to change the code as to make the `if` a bit complex. > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379780#comment-16379780 ] ASF GitHub Bot commented on FLINK-7836: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 @tillrohrmann could you review this PR? Thanks! > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8792) bad semantic of method name MessageQueryParameter.convertStringToValue
[ https://issues.apache.org/jira/browse/FLINK-8792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379779#comment-16379779 ] ASF GitHub Bot commented on FLINK-8792: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5587 @tillrohrmann could you review this PR? Thanks! > bad semantic of method name MessageQueryParameter.convertStringToValue > > > Key: FLINK-8792 > URL: https://issues.apache.org/jira/browse/FLINK-8792 > Project: Flink > Issue Type: Improvement > Components: REST >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > > the method name > {code:java} > MessageQueryParameter.convertStringToValue > {code} > should be > {code:java} > convertValueToString > {code} > or > {code:java} > convertStringFromValue{code} > I think > {code:java} > convertValueToString > {code} > would be better. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5593 @tillrohrmann could you review this PR? Thanks! ---
[GitHub] flink issue #5587: [FLINK-8792][REST] bad semantic of method name MessageQue...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5587 @tillrohrmann could you review this PR? Thanks! ---
[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379777#comment-16379777 ] Wind commented on FLINK-6847: - can you assign it to me if possible ? > Add TIMESTAMPDIFF supported in TableAPI > --- > > Key: FLINK-6847 > URL: https://issues.apache.org/jira/browse/FLINK-6847 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > see FLINK-6813 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wind updated FLINK-6847: Comment: was deleted (was: can you assign it to me if possible ?) > Add TIMESTAMPDIFF supported in TableAPI > --- > > Key: FLINK-6847 > URL: https://issues.apache.org/jira/browse/FLINK-6847 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Priority: Major > Labels: starter > > see FLINK-6813 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7470) Acquire RM leadership before registering with Mesos
[ https://issues.apache.org/jira/browse/FLINK-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379756#comment-16379756 ] Bertrand Bossy commented on FLINK-7470: --- Is there any update on this? FLIP-6 mode for Mesos seems to have severe issues due to that. > Acquire RM leadership before registering with Mesos > --- > > Key: FLINK-7470 > URL: https://issues.apache.org/jira/browse/FLINK-7470 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Priority: Major > > Mesos doesn't support fencing tokens in the scheduler protocol; it assumes > external leader election among scheduler instances. The last connection > wins; prior connections for a given framework ID are closed. > The Mesos RM should not register as a framework until it has acquired RM > leadership. Evolve the ResourceManager as necessary. One option is to > introduce an ResourceManagerRunner that acquires leadership before starting > the RM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn
[ https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379722#comment-16379722 ] ASF GitHub Bot commented on FLINK-7836: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5593 [FLINK-7836][Client] specifying node label for flink job to run on yarn ## What is the purpose of the change *This pull request support flink on yarn to specify yarn node label for yarn flink application both flink session and single job mode* ## Brief change log - *define a new command option to specify a yarn node label* - *set the node label for the application through reflection (to support low version hadoop)* - *add command line option description* ## Verifying this change This change without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-7836 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5593 commit 5f7342716223adea6ec1745d319a44a2bd3188da Author: vinoyangDate: 2018-02-28T03:02:07Z [FLINK-7836][Client] specifying node label for flink job to run on yarn > specifying node label for flink job to run on yarn > -- > > Key: FLINK-7836 > URL: https://issues.apache.org/jira/browse/FLINK-7836 > Project: Flink > Issue Type: New Feature > Components: Client >Affects Versions: 1.3.2 >Reporter: zhaibaba >Assignee: vinoyang >Priority: Major > > flink client cannot specify node label for flink job to run on yarn -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5593: [FLINK-7836][Client] specifying node label for fli...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5593 [FLINK-7836][Client] specifying node label for flink job to run on yarn ## What is the purpose of the change *This pull request support flink on yarn to specify yarn node label for yarn flink application both flink session and single job mode* ## Brief change log - *define a new command option to specify a yarn node label* - *set the node label for the application through reflection (to support low version hadoop)* - *add command line option description* ## Verifying this change This change without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-7836 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5593 commit 5f7342716223adea6ec1745d319a44a2bd3188da Author: vinoyangDate: 2018-02-28T03:02:07Z [FLINK-7836][Client] specifying node label for flink job to run on yarn ---
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379717#comment-16379717 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171133066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List > toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- I agree with you that the breaking case looks a bit dangerous ... I think maybe we could pass a `Predicate` for the `if` and let the caller side pass the `Predicate` into this function. This could make it cleaner from the caller side and don't need to mass the logic into the `if` to make it complex. > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171133066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List > toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- I agree with you that the breaking case looks a bit dangerous ... I think maybe we could pass a `Predicate` for the `if` and let the caller side pass the `Predicate` into this function. This could make it cleaner from the caller side and don't need to mass the logic into the `if` to make it complex. ---
[GitHub] flink pull request #5592: [hotfix] fix javadoc link of ClusterClient#trigger...
GitHub user Matrix42 opened a pull request: https://github.com/apache/flink/pull/5592 [hotfix] fix javadoc link of ClusterClient#triggerSavepoint ## What is the purpose of the change fix javadoc link of ClusterClient#triggerSavepoint You can merge this pull request into a Git repository by running: $ git pull https://github.com/Matrix42/flink doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5592.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5592 commit 09691377720361cd2d05007371dbc579aa876bc0 Author: Matrix42 <934336389@...> Date: 2018-02-28T02:51:42Z [hotfix] fix javadoc link of ClusterClient#triggerSavepoint ---
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379697#comment-16379697 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379696#comment-16379696 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130718 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Aha, this is just for passing the existing test case in `TaskLocalStateStoreImplTest` ... ```java private void checkStoredAsExpected(List history, int off, int len) throws Exception { for (int i = off; i < len; ++i) { TaskStateSnapshot expected = history.get(i); Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); Mockito.verify(expected, Mockito.never()).discardState(); } } ``` > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130767 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- ð ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- ð ---
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379698#comment-16379698 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130767 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130718 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Aha, this is just for passing the existing test case in `TaskLocalStateStoreImplTest` ... ```java private void checkStoredAsExpected(List history, int off, int len) throws Exception { for (int i = off; i < len; ++i) { TaskStateSnapshot expected = history.get(i); Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); Mockito.verify(expected, Mockito.never()).discardState(); } } ``` ---
[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379673#comment-16379673 ] ASF GitHub Bot commented on FLINK-8620: --- Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5580 @zentol All right, got your point. That's a problem indeed. > Enable shipping custom artifacts to BlobStore and accessing them through > DistributedCache > - > > Key: FLINK-8620 > URL: https://issues.apache.org/jira/browse/FLINK-8620 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > We should be able to distribute custom files to taskmanagers. To do that we > can store those files in BlobStore and later on access them in TaskManagers > through DistributedCache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...
Github user ifndef-SleePy commented on the issue: https://github.com/apache/flink/pull/5580 @zentol All right, got your point. That's a problem indeed. ---
[jira] [Updated] (FLINK-8708) Unintended integer division in StandaloneThreadedGenerator
[ https://issues.apache.org/jira/browse/FLINK-8708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8708: -- Description: In flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java : {code} double factor = (ts - lastTimeStamp) / 1000; {code} Proper casting should be done before the integer division was: In flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java : {code} double factor = (ts - lastTimeStamp) / 1000; {code} Proper casting should be done before the integer division > Unintended integer division in StandaloneThreadedGenerator > -- > > Key: FLINK-8708 > URL: https://issues.apache.org/jira/browse/FLINK-8708 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > In > flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java > : > {code} > double factor = (ts - lastTimeStamp) / 1000; > {code} > Proper casting should be done before the integer division -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8720) Logging exception with S3 connector and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-8720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379167#comment-16379167 ] dejan miljkovic commented on FLINK-8720: I see from FLINK-8798 that solution is going to be provided from 1.4.2. Do you know when 1.4.2 is going to be released. > Logging exception with S3 connector and BucketingSink > - > > Key: FLINK-8720 > URL: https://issues.apache.org/jira/browse/FLINK-8720 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.1 >Reporter: dejan miljkovic >Priority: Critical > > Trying to stream data to S3. Code works from InteliJ. When submitting code > trough UI on my machine (single node cluster started by start-cluster.sh > script) below stack trace is produced. > > Below is the link to the simple test app that is streaming data to S3. > [https://github.com/dmiljkovic/test-flink-bucketingsink-s3] > The behavior is bit different but same error is produced. Job works only > once. If job is submitted second time below stack trace is produced. If I > restart the cluster job works but only for the first time. > > > org.apache.commons.logging.LogConfigurationException: > java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3 (Caused by > java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3) > at > org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637) > at > org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336) > at > org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310) > at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96) > at > com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) > at > com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) > at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158) > at > com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119) > at > com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389) > at > com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3 > at > org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700) > at > org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187) > at > org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914) > at > org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604) > ... 26 more > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8538. - Resolution: Fixed Fix Version/s: 1.6.0 1.5.0 Fixed in 1.6.0: 2450d2b24006a4db846b9b688f9b598e3fdf7c6e & d9f2f2f83c8a099e687e158705375db1b56dca01 Fixed in 1.5.0: 1d26062de130c05fdbe7701b55766b4a8d433418 & db2c510fb4f171c9e9940759e5fbaf466ec74474 > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379148#comment-16379148 ] ASF GitHub Bot commented on FLINK-8538: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5564 > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5564 ---
[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()
[ https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379074#comment-16379074 ] ASF GitHub Bot commented on FLINK-8667: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5500 @tillrohrmann @kl0u Thanks for reviewing, guys As @pnowojski mentioned, we three decided to expose timer keys in `ProcessFunction` in [FLINK-8560](https://github.com/apache/flink/pull/5481). Exposing timer keys in `KeyedBroadcastProcessFunction` extends that design. I think we should get this PR into 1.5.0 so we don't need to do the [complicated refactoring for FLINK-8560](https://github.com/apache/flink/pull/5481) to support backward compatibility > expose key in KeyedBroadcastProcessFunction#onTimer() > - > > Key: FLINK-8667 > URL: https://issues.apache.org/jira/browse/FLINK-8667 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > [~aljoscha] [~pnowojski] > Since KeyedBroadcastProcessFunction is about to get out of the door, I think > it will be great to expose the timer's key in KeyedBroadcastProcessFunction > too. If we don't do it now, it will be much more difficult to add the feature > on later because of user app compatibility issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5500 @tillrohrmann @kl0u Thanks for reviewing, guys As @pnowojski mentioned, we three decided to expose timer keys in `ProcessFunction` in [FLINK-8560](https://github.com/apache/flink/pull/5481). Exposing timer keys in `KeyedBroadcastProcessFunction` extends that design. I think we should get this PR into 1.5.0 so we don't need to do the [complicated refactoring for FLINK-8560](https://github.com/apache/flink/pull/5481) to support backward compatibility ---
[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView
[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379063#comment-16379063 ] ASF GitHub Bot commented on FLINK-8689: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ I agree this design is much cleaner and easier to maintain later. I was hesitating to change the function signature of `generateAggregations()`. I will try to introduce some common building blocks and handle implementation effort in codegen. I will imagine a lot of ITCases will be needed :-) Thanks for the feedback @hequn8128 @fhueske > Add runtime support of distinct filter using MapView > - > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/ I agree this design is much cleaner and easier to maintain later. I was hesitating to change the function signature of `generateAggregations()`. I will try to introduce some common building blocks and handle implementation effort in codegen. I will imagine a lot of ITCases will be needed :-) Thanks for the feedback @hequn8128 @fhueske ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379044#comment-16379044 ] ASF GitHub Bot commented on FLINK-8538: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5564 Thank you @fhueske. Will merge... > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5564 Thank you @fhueske. Will merge... ---
[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support
[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379026#comment-16379026 ] ASF GitHub Bot commented on FLINK-8538: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5564 Thanks for the update. I think this is good to merge. > Add a Kafka table source factory with JSON format support > - > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5564 Thanks for the update. I think this is good to merge. ---
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378993#comment-16378993 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001535 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) { * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { --- End diff -- You should also add a comment, that checking for custom events is not supported. > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: null > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378989#comment-16378989 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001301 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoad try { int type = buffer.getInt(); - switch (type) { - case END_OF_PARTITION_EVENT: - return eventClass.equals(EndOfPartitionEvent.class); - case CHECKPOINT_BARRIER_EVENT: - return eventClass.equals(CheckpointBarrier.class); - case END_OF_SUPERSTEP_EVENT: - return eventClass.equals(EndOfSuperstepEvent.class); - case CANCEL_CHECKPOINT_MARKER_EVENT: - return eventClass.equals(CancelCheckpointMarker.class); - case OTHER_EVENT: - try { - final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - - final Class clazz; - try { - clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); - } - catch (ClassNotFoundException e) { - throw new IOException("Could not load event class '" + className + "'.", e); - } - catch (ClassCastException e) { - throw new IOException("The class '" + className + "' is not a valid subclass of '" - + AbstractEvent.class.getName() + "'.", e); - } - return eventClass.equals(clazz); - } - catch (Exception e) { - throw new IOException("Error while deserializing or instantiating event.", e); - } - default: - throw new IOException("Corrupt byte stream for event"); + if (eventClass.equals(EndOfPartitionEvent.class)) { + return type == END_OF_PARTITION_EVENT; + } else if (eventClass.equals(CheckpointBarrier.class)) { + return type == CHECKPOINT_BARRIER_EVENT; + } else if (eventClass.equals(EndOfSuperstepEvent.class)) { + return type == END_OF_SUPERSTEP_EVENT; + } else if (eventClass.equals(CancelCheckpointMarker.class)) { + return type == CANCEL_CHECKPOINT_MARKER_EVENT; + } else { + throw new IOException("Corrupt byte stream for event or unsupported eventClass = " + eventClass); --- End diff -- Actually, this should be an `UnsupportedOperationException` since this is only based on the class being given and not the input stream. > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at >
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378992#comment-16378992 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171000868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) { * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { --- End diff -- this change qualifies for a separate JIRA ticket, not just a hotfix > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: null > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378994#comment-16378994 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171004092 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java --- @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception { } /** -* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns +* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns * the correct answer for various encoded event buffers. */ @Test public void testIsEvent() throws Exception { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()), new TestTaskEvent(Math.random(), 12361231273L), - new CancelCheckpointMarker(287087987329842L) + new CancelCheckpointMarker(287087987329842L), + EndOfSuperstepEvent.INSTANCE + }; + + Class[] expectedClasses = { + EndOfPartitionEvent.class, + CheckpointBarrier.class, + CancelCheckpointMarker.class, + EndOfSuperstepEvent.class --- End diff -- This extra array seems a bit error-prone and requires maintenance in case the events are extended - wouldn't it be equally clear if we used your new naming with the original array? > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: null > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171004092 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java --- @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception { } /** -* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns +* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns * the correct answer for various encoded event buffers. */ @Test public void testIsEvent() throws Exception { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()), new TestTaskEvent(Math.random(), 12361231273L), - new CancelCheckpointMarker(287087987329842L) + new CancelCheckpointMarker(287087987329842L), + EndOfSuperstepEvent.INSTANCE + }; + + Class[] expectedClasses = { + EndOfPartitionEvent.class, + CheckpointBarrier.class, + CancelCheckpointMarker.class, + EndOfSuperstepEvent.class --- End diff -- This extra array seems a bit error-prone and requires maintenance in case the events are extended - wouldn't it be equally clear if we used your new naming with the original array? ---
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378991#comment-16378991 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171002283 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java --- @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception { } /** -* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns +* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns * the correct answer for various encoded event buffers. */ @Test public void testIsEvent() throws Exception { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()), new TestTaskEvent(Math.random(), 12361231273L), - new CancelCheckpointMarker(287087987329842L) + new CancelCheckpointMarker(287087987329842L), + EndOfSuperstepEvent.INSTANCE + }; --- End diff -- I wonder why the order of the events changed? > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: null > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent
[ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378990#comment-16378990 ] ASF GitHub Bot commented on FLINK-8750: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -318,13 +295,9 @@ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) t * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - public static boolean isEvent(final Buffer buffer, - final Class eventClass, - final ClassLoader classLoader) throws IOException { - return !buffer.isBuffer() && - isEvent(buffer.getNioBufferReadable(), eventClass, classLoader); + public static boolean isEvent(Buffer buffer, Class eventClass) throws IOException { + return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass); --- End diff -- similar here: add a comment, that checking for custom events is not supported anymore > InputGate may contain data after an EndOfPartitionEvent > --- > > Key: FLINK-8750 > URL: https://issues.apache.org/jira/browse/FLINK-8750 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates > that there was still some data after an {{EndOfPartitionEvent}} or that > {{BufferOrEvent#moreAvailable}} contained the wrong value: > {code} > testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase) > Time elapsed: 4.611 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: null > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001535 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) { * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { --- End diff -- You should also add a comment, that checking for custom events is not supported. ---
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -318,13 +295,9 @@ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) t * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - public static boolean isEvent(final Buffer buffer, - final Class eventClass, - final ClassLoader classLoader) throws IOException { - return !buffer.isBuffer() && - isEvent(buffer.getNioBufferReadable(), eventClass, classLoader); + public static boolean isEvent(Buffer buffer, Class eventClass) throws IOException { + return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass); --- End diff -- similar here: add a comment, that checking for custom events is not supported anymore ---
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171002283 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java --- @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception { } /** -* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns +* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns * the correct answer for various encoded event buffers. */ @Test public void testIsEvent() throws Exception { AbstractEvent[] events = { EndOfPartitionEvent.INSTANCE, - EndOfSuperstepEvent.INSTANCE, new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()), new TestTaskEvent(Math.random(), 12361231273L), - new CancelCheckpointMarker(287087987329842L) + new CancelCheckpointMarker(287087987329842L), + EndOfSuperstepEvent.INSTANCE + }; --- End diff -- I wonder why the order of the events changed? ---
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001301 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoad try { int type = buffer.getInt(); - switch (type) { - case END_OF_PARTITION_EVENT: - return eventClass.equals(EndOfPartitionEvent.class); - case CHECKPOINT_BARRIER_EVENT: - return eventClass.equals(CheckpointBarrier.class); - case END_OF_SUPERSTEP_EVENT: - return eventClass.equals(EndOfSuperstepEvent.class); - case CANCEL_CHECKPOINT_MARKER_EVENT: - return eventClass.equals(CancelCheckpointMarker.class); - case OTHER_EVENT: - try { - final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - - final Class clazz; - try { - clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); - } - catch (ClassNotFoundException e) { - throw new IOException("Could not load event class '" + className + "'.", e); - } - catch (ClassCastException e) { - throw new IOException("The class '" + className + "' is not a valid subclass of '" - + AbstractEvent.class.getName() + "'.", e); - } - return eventClass.equals(clazz); - } - catch (Exception e) { - throw new IOException("Error while deserializing or instantiating event.", e); - } - default: - throw new IOException("Corrupt byte stream for event"); + if (eventClass.equals(EndOfPartitionEvent.class)) { + return type == END_OF_PARTITION_EVENT; + } else if (eventClass.equals(CheckpointBarrier.class)) { + return type == CHECKPOINT_BARRIER_EVENT; + } else if (eventClass.equals(EndOfSuperstepEvent.class)) { + return type == END_OF_SUPERSTEP_EVENT; + } else if (eventClass.equals(CancelCheckpointMarker.class)) { + return type == CANCEL_CHECKPOINT_MARKER_EVENT; + } else { + throw new IOException("Corrupt byte stream for event or unsupported eventClass = " + eventClass); --- End diff -- Actually, this should be an `UnsupportedOperationException` since this is only based on the class being given and not the input stream. ---
[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171000868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) { * * @param buffer the buffer to peak into * @param eventClass the expected class of the event type -* @param classLoader the class loader to use for custom event classes * @return whether the event class of the buffer matches the given eventClass */ - private static boolean isEvent(ByteBuffer buffer, Class eventClass, ClassLoader classLoader) throws IOException { + private static boolean isEvent(ByteBuffer buffer, Class eventClass) throws IOException { --- End diff -- this change qualifies for a separate JIRA ticket, not just a hotfix ---
[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378965#comment-16378965 ] ASF GitHub Bot commented on FLINK-8755: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5581 rebased (not including newest FLINK-8694 changes!) and addressed the comments > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in spilled...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5581 rebased (not including newest FLINK-8694 changes!) and addressed the comments ---
[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378940#comment-16378940 ] ASF GitHub Bot commented on FLINK-8755: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170993496 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- almost - it remains `@Nullable` in `SubpartitionTestBase#assertNextBufferOrEvent` > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their buffers are not included in > the backlog counting and therefore, {{isMoreAvailable}} will be wrongly > {{false}} here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170993496 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws Exception { assertEquals(1, listener.getNumNotifications()); assertFalse(reader.nextBufferIsEvent()); // buffer - BufferAndBacklog read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true); assertEquals(2, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertFalse(read.nextBufferIsEvent()); assertFalse(reader.nextBufferIsEvent()); // buffer - read = reader.getNextBuffer(); - assertNotNull(read); - assertTrue(read.buffer().isBuffer()); + assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true); assertEquals(1, partition.getBuffersInBacklog()); - assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog()); - assertFalse(read.buffer().isRecycled()); - read.buffer().recycleBuffer(); - assertTrue(read.buffer().isRecycled()); - assertTrue(read.nextBufferIsEvent()); assertTrue(reader.nextBufferIsEvent()); // event - read = reader.getNextBuffer(); - assertNotNull(read); - assertFalse(read.buffer().isBuffer()); + assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, true); --- End diff -- almost - it remains `@Nullable` in `SubpartitionTestBase#assertNextBufferOrEvent` ---
[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available
[ https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378926#comment-16378926 ] ASF GitHub Bot commented on FLINK-8755: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170990471 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -138,11 +145,68 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + @Nullable Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + + assertEquals("buffer size", expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); --- End diff -- Unfortunately yes: - with the string: ``` java.lang.AssertionError: buffer size Expected :1025 Actual :1024 ``` - without the string: ``` java.lang.AssertionError: Expected :1025 Actual :1024 ``` It is even worse for boolean values as you may imagine. This way, you can immediately get to your test to fix the assumption and do not have to click into `SubpartitionTestBase` to identify what was actually wrong. > SpilledSubpartitionView wrongly relys on the backlog for determining whether > more data is available > --- > > Key: FLINK-8755 > URL: https://issues.apache.org/jira/browse/FLINK-8755 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {code} > public BufferAndBacklog getNextBuffer() throws IOException, > InterruptedException { > //... > int newBacklog = parent.decreaseBuffersInBacklog(current); > return new BufferAndBacklog(current, newBacklog > 0, newBacklog, > nextBufferIsEvent); > {code} > relies on the backlog to signal further data availability. However, if there > are only events left in the buffer queue, their
[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5581#discussion_r170990471 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -138,11 +145,68 @@ static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable, - int expectedBuffersInBacklog) throws IOException, InterruptedException { + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + true, + null, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + static void assertNextEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + @Nullable Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + assertNextBufferOrEvent( + readView, + expectedReadableBufferSize, + false, + expectedEventClass, + expectedIsMoreAvailable, + expectedBuffersInBacklog, + expectedNextBufferIsEvent, + expectedRecycledAfterRecycle); + } + + private static void assertNextBufferOrEvent( + ResultSubpartitionView readView, + int expectedReadableBufferSize, + boolean expectedIsBuffer, + Class expectedEventClass, + boolean expectedIsMoreAvailable, + int expectedBuffersInBacklog, + boolean expectedNextBufferIsEvent, + boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException { + checkArgument(expectedEventClass == null || !expectedIsBuffer); + ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer(); - assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); - assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable()); - assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog()); + assertNotNull(bufferAndBacklog); + + assertEquals("buffer size", expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes()); --- End diff -- Unfortunately yes: - with the string: ``` java.lang.AssertionError: buffer size Expected :1025 Actual :1024 ``` - without the string: ``` java.lang.AssertionError: Expected :1025 Actual :1024 ``` It is even worse for boolean values as you may imagine. This way, you can immediately get to your test to fix the assumption and do not have to click into `SubpartitionTestBase` to identify what was actually wrong. ---
[jira] [Commented] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers
[ https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378912#comment-16378912 ] ASF GitHub Bot commented on FLINK-8737: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5583#discussion_r170988543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -61,7 +61,7 @@ * ++ * * - * It is possible to recursively union union input gates. + * It is NOT possible to recursively union union input gates. --- End diff -- That would be a more invasive change in several places using `InputGate`s and we actually are able to use any other input gate types (if there were some) - just the recursive union does not make sense. So I guess, excluding ourself is the better approach here in terms of extensibility. Think of it as we're excluding this use not because we did not implement `pollNextBuffer` but because it does not make sense and we do not want this recursion. > Creating a union of UnionGate instances will fail with > UnsupportedOperationException when retrieving buffers > > > Key: FLINK-8737 > URL: https://issues.apache.org/jira/browse/FLINK-8737 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > FLINK-8589 introduced a new polling method but did not implement > {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances > from containing a UnionGate instance which is explicitly allowed by its > documentation and interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5583: [FLINK-8737][network] disallow creating a union of...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5583#discussion_r170988543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -61,7 +61,7 @@ * ++ * * - * It is possible to recursively union union input gates. + * It is NOT possible to recursively union union input gates. --- End diff -- That would be a more invasive change in several places using `InputGate`s and we actually are able to use any other input gate types (if there were some) - just the recursive union does not make sense. So I guess, excluding ourself is the better approach here in terms of extensibility. Think of it as we're excluding this use not because we did not implement `pollNextBuffer` but because it does not make sense and we do not want this recursion. ---
[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378910#comment-16378910 ] ASF GitHub Bot commented on FLINK-8756: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5573 @yanghua I will take a look this week. > Support ClusterClient.getAccumulators() in RestClusterClient > > > Key: FLINK-8756 > URL: https://issues.apache.org/jira/browse/FLINK-8756 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5573 @yanghua I will take a look this week. ---
[jira] [Commented] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers
[ https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378907#comment-16378907 ] ASF GitHub Bot commented on FLINK-8737: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5583#discussion_r170987211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -189,11 +189,11 @@ public void requestPartitions() throws IOException, InterruptedException { bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return Optional.ofNullable(bufferOrEvent); + return Optional.of(bufferOrEvent); } @Override - public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + public Optional pollNextBufferOrEvent() throws UnsupportedOperationException { --- End diff -- we don't need to - I thought this makes it more explicit if anyone tries to use this method in an IDE without looking at the code (which is a mistake imho, but commonly done) > Creating a union of UnionGate instances will fail with > UnsupportedOperationException when retrieving buffers > > > Key: FLINK-8737 > URL: https://issues.apache.org/jira/browse/FLINK-8737 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > FLINK-8589 introduced a new polling method but did not implement > {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances > from containing a UnionGate instance which is explicitly allowed by its > documentation and interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5583: [FLINK-8737][network] disallow creating a union of...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5583#discussion_r170987211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -189,11 +189,11 @@ public void requestPartitions() throws IOException, InterruptedException { bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return Optional.ofNullable(bufferOrEvent); + return Optional.of(bufferOrEvent); } @Override - public Optional pollNextBufferOrEvent() throws IOException, InterruptedException { + public Optional pollNextBufferOrEvent() throws UnsupportedOperationException { --- End diff -- we don't need to - I thought this makes it more explicit if anyone tries to use this method in an IDE without looking at the code (which is a mistake imho, but commonly done) ---
[jira] [Assigned] (FLINK-8336) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability
[ https://issues.apache.org/jira/browse/FLINK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-8336: -- Assignee: Nico Kruber > YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability > --- > > Key: FLINK-8336 > URL: https://issues.apache.org/jira/browse/FLINK-8336 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3}} fails on > Travis. I suspect that this has something to do with the consistency > guarantees S3 gives us. > https://travis-ci.org/tillrohrmann/flink/jobs/323930297 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378890#comment-16378890 ] ASF GitHub Bot commented on FLINK-8777: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Why do we need this? Is there any case for not doing the cleanup? > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378891#comment-16378891 ] ASF GitHub Bot commented on FLINK-8777: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170982734 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List > toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- After a second though, while I think this code is currently correct, the case with breaking looks a bit dangerous. Potentially, if the checkpoint id is not there, this would not stop and prune ongoing checkpoints. I wonder if we should make the `if` a bit more complex, but safer (checking that the breaking case never exceeds the checkpoint id). What do you think? > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1637#comment-1637 ] ASF GitHub Bot commented on FLINK-8777: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- I suggest to add an assert that the thread holds `lock` and document that this method should be called only when holding the lock. > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- Comment is no longer required. ---
[jira] [Assigned] (FLINK-8408) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-8408: -- Assignee: Nico Kruber > YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis > -- > > Key: FLINK-8408 > URL: https://issues.apache.org/jira/browse/FLINK-8408 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n}} is unstable > on Travis. > https://travis-ci.org/tillrohrmann/flink/jobs/327216460 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378889#comment-16378889 ] ASF GitHub Bot commented on FLINK-8777: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- Comment is no longer required. > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- I suggest to add an assert that the thread holds `lock` and document that this method should be called only when holding the lock. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Why do we need this? Is there any case for not doing the cleanup? ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170982734 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List > toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- After a second though, while I think this code is currently correct, the case with breaking looks a bit dangerous. Potentially, if the checkpoint id is not there, this would not stop and prune ongoing checkpoints. I wonder if we should make the `if` a bit more complex, but safer (checking that the breaking case never exceeds the checkpoint id). What do you think? ---
[jira] [Updated] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3
[ https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8801: --- Fix Version/s: 1.4.3 > S3's eventual consistent read-after-write may fail yarn deployment of > resources to S3 > - > > Key: FLINK-8801 > URL: https://issues.apache.org/jira/browse/FLINK-8801 > Project: Flink > Issue Type: Bug > Components: FileSystem, ResourceManager, YARN >Affects Versions: 1.4.0, 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > According to > https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel: > {quote} > Amazon S3 provides read-after-write consistency for PUTS of new objects in > your S3 bucket in all regions with one caveat. The caveat is that if you make > a HEAD or GET request to the key name (to find if the object exists) before > creating the object, Amazon S3 provides eventual consistency for > read-after-write. > {quote} > Some S3 file system implementations may actually execute such a request for > the about-to-write object and thus the read-after-write is only eventually > consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently > relies on a consistent read-after-write since it accesses the remote resource > to get file size and modification timestamp. Since there we have access to > the local resource, we can use the data from there instead and circumvent the > problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3
Nico Kruber created FLINK-8801: -- Summary: S3's eventual consistent read-after-write may fail yarn deployment of resources to S3 Key: FLINK-8801 URL: https://issues.apache.org/jira/browse/FLINK-8801 Project: Flink Issue Type: Bug Components: FileSystem, ResourceManager, YARN Affects Versions: 1.4.0, 1.5.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.5.0 According to https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel: {quote} Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write. {quote} Some S3 file system implementations may actually execute such a request for the about-to-write object and thus the read-after-write is only eventually consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently relies on a consistent read-after-write since it accesses the remote resource to get file size and modification timestamp. Since there we have access to the local resource, we can use the data from there instead and circumvent the problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8798) Make commons-logging a parent-first pattern
[ https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8798. --- > Make commons-logging a parent-first pattern > --- > > Key: FLINK-8798 > URL: https://issues.apache.org/jira/browse/FLINK-8798 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.4.2, 1.6.0 > > > The Apache {{commons-logging}} framework does not play well with child-first > classloading. > We need to make this a parent-first pattern. > As a matter of fact, other frameworks that use inverted classloading (JBoss, > Tomcat) use force this library to be always parent-first as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8798) Make commons-logging a parent-first pattern
[ https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8798. - Resolution: Fixed Fixed in - 1.4.2 via 392cfaaed9380c5ea38b8593d23023925638cbe3 - 1.5.0 via a269f8519305faff153e84d729873b6f9497bd36 - 1.6.0 via 59fb56bc8378645b82ee31e1b3bd07e5045a3698 > Make commons-logging a parent-first pattern > --- > > Key: FLINK-8798 > URL: https://issues.apache.org/jira/browse/FLINK-8798 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.4.2, 1.6.0 > > > The Apache {{commons-logging}} framework does not play well with child-first > classloading. > We need to make this a parent-first pattern. > As a matter of fact, other frameworks that use inverted classloading (JBoss, > Tomcat) use force this library to be always parent-first as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8787) Deploying FLIP-6 YARN session with HA fails
[ https://issues.apache.org/jira/browse/FLINK-8787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378868#comment-16378868 ] ASF GitHub Bot commented on FLINK-8787: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5591 [FLINK-8787][flip6] Deploying FLIP-6 YARN session with HA fails ## What is the purpose of the change *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`. There is code that relies on side effects on the configuration object. This is a quick and dirty solution. In future, the descriptor should be made immutable: [FLINK-8799](https://issues.apache.org/jira/browse/FLINK-8799)* cc: @tillrohrmann ## Brief change log - *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`* ## Verifying this change This change added tests and can be verified as follows: - *Manually deployed Flink cluster on YARN with HA enabled.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8787-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5591.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5591 commit 012f656705466cea4ba2e037f7e52f46c0a1bf9f Author: gyaoDate: 2018-02-27T15:58:53Z [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor > Deploying FLIP-6 YARN session with HA fails > --- > > Key: FLINK-8787 > URL: https://issues.apache.org/jira/browse/FLINK-8787 > Project: Flink > Issue Type: Bug > Components: Client, YARN >Affects Versions: 1.5.0 > Environment: emr-5.12.0 > Hadoop distribution: Amazon 2.8.3 > Applications: ZooKeeper 3.4.10 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Starting a YARN session with HA in FLIP-6 mode fails with an exception. > Commit: 5e3fa4403f518dd6d3fe9970fe8ca55871add7c9 > Command to start YARN session: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > HADOOP_CONF_DIR=/etc/hadoop/conf bin/yarn-session.sh -d -n 1 -s 1 -jm 2048 > -tm 2048 > {noformat} > Stacktrace: > {noformat} > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:790) > Caused by: org.apache.flink.util.FlinkException: Could not write the Yarn > connection information. > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:612) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:790) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > ... 2 more > Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: > Could not retrieve the leader address and leader session ID. > at > org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:116) > at > org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:405) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:589) > ... 6 more > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [6 milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) > at
[jira] [Created] (FLINK-8800) Set Logging to TRACE for org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler
Stephan Ewen created FLINK-8800: --- Summary: Set Logging to TRACE for org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler Key: FLINK-8800 URL: https://issues.apache.org/jira/browse/FLINK-8800 Project: Flink Issue Type: Bug Components: REST Reporter: Stephan Ewen Fix For: 1.5.0, 1.6.0 When setting the log level to {{DEBUG}}, the logs are swamped with statements as below, making it hard to read the debug logs. {code} 2018-02-22 13:41:04,016 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/ded95c643b42f31cf882a8986207fd30/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,048 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/eec5890dac9c38f66954443809beb5b0/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,052 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/2a964ee72788c82cb7d15e352d9a94f6/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,079 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/1d9c83f6e1879fdbe461aafac16eb8a5/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,085 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/4063620891a151092c5bcedb218870a6/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,094 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/2a751c66e0e32aee2cd8120a1a72a4d6/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,142 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/37ecc85b429bd08d0fd539532055e117/metrics?get=0.currentLowWatermark. 2018-02-22 13:41:04,173 DEBUG org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler - Received request /jobs/ec1c9d7a3c413a9523656efa58735009/vertices/20e20298680571979f690d36d1a6db36/metrics?get=0.currentLowWatermark. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5591: [FLINK-8787][flip6] Deploying FLIP-6 YARN session ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5591 [FLINK-8787][flip6] Deploying FLIP-6 YARN session with HA fails ## What is the purpose of the change *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`. There is code that relies on side effects on the configuration object. This is a quick and dirty solution. In future, the descriptor should be made immutable: [FLINK-8799](https://issues.apache.org/jira/browse/FLINK-8799)* cc: @tillrohrmann ## Brief change log - *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`* ## Verifying this change This change added tests and can be verified as follows: - *Manually deployed Flink cluster on YARN with HA enabled.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8787-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5591.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5591 commit 012f656705466cea4ba2e037f7e52f46c0a1bf9f Author: gyaoDate: 2018-02-27T15:58:53Z [FLINK-8787][flip6] Do not copy flinkConfiguration in AbstractYarnClusterDescriptor ---
[jira] [Resolved] (FLINK-8798) Make commons-logging a parent-first pattern
[ https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8798. - Resolution: Fixed Fixed in - 1.4.2 via 392cfaaed9380c5ea38b8593d23023925638cbe3 > Make commons-logging a parent-first pattern > --- > > Key: FLINK-8798 > URL: https://issues.apache.org/jira/browse/FLINK-8798 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.4.2, 1.6.0 > > > The Apache {{commons-logging}} framework does not play well with child-first > classloading. > We need to make this a parent-first pattern. > As a matter of fact, other frameworks that use inverted classloading (JBoss, > Tomcat) use force this library to be always parent-first as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable
Gary Yao created FLINK-8799: --- Summary: Make AbstractYarnClusterDescriptor immutable Key: FLINK-8799 URL: https://issues.apache.org/jira/browse/FLINK-8799 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.5.0 Reporter: Gary Yao Fix For: 1.6.0 {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its internal configuration is modified from different places which makes it difficult to reason about the code. For example, it should not be possible to modify the {{zookeeperNamespace}} using a setter method. A user of this class should be forced to provide all information prior to creating the instance, e.g., by passing a {{org.apache.flink.configuration.Configuration}} object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8798) Make commons-logging a parent-first pattern
[ https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen reopened FLINK-8798: - prematurely closed > Make commons-logging a parent-first pattern > --- > > Key: FLINK-8798 > URL: https://issues.apache.org/jira/browse/FLINK-8798 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.4.2, 1.6.0 > > > The Apache {{commons-logging}} framework does not play well with child-first > classloading. > We need to make this a parent-first pattern. > As a matter of fact, other frameworks that use inverted classloading (JBoss, > Tomcat) use force this library to be always parent-first as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5582: [FLINK-8790][State] Improve performance for recovery from...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5582 Thanks for the contribution! We are currently busy with the 1.5 release. I will have a closer look at this PR and your other pending JIRAs after the release is out. ---
[jira] [Commented] (FLINK-8790) Improve performance for recovery from incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378850#comment-16378850 ] ASF GitHub Bot commented on FLINK-8790: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5582 Thanks for the contribution! We are currently busy with the 1.5 release. I will have a closer look at this PR and your other pending JIRAs after the release is out. > Improve performance for recovery from incremental checkpoint > > > Key: FLINK-8790 > URL: https://issues.apache.org/jira/browse/FLINK-8790 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When there are multi state handle to be restored, we can improve the > performance as follow: > 1. Choose the best state handle to init the target db > 2. Use the other state handles to create temp db, and clip the db according > to the target key group range (via rocksdb.deleteRange()), this can help use > get rid of the `key group check` in > `data insertion loop` and also help us get rid of traversing the useless > record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8720) Logging exception with S3 connector and BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-8720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378844#comment-16378844 ] Stephan Ewen commented on FLINK-8720: - The proper fix is coming, tracked in FLINK-8798 > Logging exception with S3 connector and BucketingSink > - > > Key: FLINK-8720 > URL: https://issues.apache.org/jira/browse/FLINK-8720 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.1 >Reporter: dejan miljkovic >Priority: Critical > > Trying to stream data to S3. Code works from InteliJ. When submitting code > trough UI on my machine (single node cluster started by start-cluster.sh > script) below stack trace is produced. > > Below is the link to the simple test app that is streaming data to S3. > [https://github.com/dmiljkovic/test-flink-bucketingsink-s3] > The behavior is bit different but same error is produced. Job works only > once. If job is submitted second time below stack trace is produced. If I > restart the cluster job works but only for the first time. > > > org.apache.commons.logging.LogConfigurationException: > java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3 (Caused by > java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3) > at > org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637) > at > org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336) > at > org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310) > at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88) > at > org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96) > at > com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) > at > com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) > at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158) > at > com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119) > at > com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389) > at > com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalAccessError: > org/apache/commons/logging/impl/LogFactoryImpl$3 > at > org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700) > at > org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187) > at > org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914) > at > org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604) > ... 26 more > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8798) Make commons-logging a parent-first pattern
Stephan Ewen created FLINK-8798: --- Summary: Make commons-logging a parent-first pattern Key: FLINK-8798 URL: https://issues.apache.org/jira/browse/FLINK-8798 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.1 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.5.0, 1.4.2, 1.6.0 The Apache {{commons-logging}} framework does not play well with child-first classloading. We need to make this a parent-first pattern. As a matter of fact, other frameworks that use inverted classloading (JBoss, Tomcat) use force this library to be always parent-first as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5500 If this security concern is an issue, maybe in that case `getCurrentKey()` should take `object-reuse` into account, and if `object-reuse` is set to false, make a defensive copy of the key on access? However @aljoscha was not objecting the general idea in the other PR: https://github.com/apache/flink/pull/5481 ---
[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()
[ https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378819#comment-16378819 ] ASF GitHub Bot commented on FLINK-8667: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5500 If this security concern is an issue, maybe in that case `getCurrentKey()` should take `object-reuse` into account, and if `object-reuse` is set to false, make a defensive copy of the key on access? However @aljoscha was not objecting the general idea in the other PR: https://github.com/apache/flink/pull/5481 > expose key in KeyedBroadcastProcessFunction#onTimer() > - > > Key: FLINK-8667 > URL: https://issues.apache.org/jira/browse/FLINK-8667 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > [~aljoscha] [~pnowojski] > Since KeyedBroadcastProcessFunction is about to get out of the door, I think > it will be great to expose the timer's key in KeyedBroadcastProcessFunction > too. If we don't do it now, it will be much more difficult to add the feature > on later because of user app compatibility issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()
[ https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378818#comment-16378818 ] ASF GitHub Bot commented on FLINK-8667: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5500 If also @aljoscha is ok, I can review this PR and merge it. > expose key in KeyedBroadcastProcessFunction#onTimer() > - > > Key: FLINK-8667 > URL: https://issues.apache.org/jira/browse/FLINK-8667 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > [~aljoscha] [~pnowojski] > Since KeyedBroadcastProcessFunction is about to get out of the door, I think > it will be great to expose the timer's key in KeyedBroadcastProcessFunction > too. If we don't do it now, it will be much more difficult to add the feature > on later because of user app compatibility issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8360) Implement task-local state recovery
[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-8360. - Resolution: Fixed > Implement task-local state recovery > --- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5500 If also @aljoscha is ok, I can review this PR and merge it. ---
[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.
[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378805#comment-16378805 ] Aljoscha Krettek commented on FLINK-7756: - [~shashank734] Not right now. But thanks for providing the example, we're having a look at it and are trying to figure out what's going wrong. > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > - > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing, Streaming >Affects Versions: 1.4.0, 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend >Reporter: Shashank Agarwal >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: jobmanager.log, jobmanager_without_cassandra.log, > taskmanager.log, taskmanager_without_cassandra.log > > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) >
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378802#comment-16378802 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r170966200 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws Exception { partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 - readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + /** +* This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to +* start from timestamp. +* +* The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition. +* After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded. +* For the validation, when the read job is configured to start from the first timestamp, each partition should start +* from offset 0 and read a total of 80 records. When configured to start from the second timestamp, +* each partition should start from offset 50 and read on the remaining 30 appended records. +*/ + public void runStartFromTimestamp() throws Exception { + // 4 partitions with 50 records each + final int parallelism = 4; + final int initialRecordsInEachPartition = 50; + final int appendRecordsInEachPartition = 30; + + long firstTimestamp = 0; + long secondTimestamp = 0; + String topic = ""; + + // attempt to create an appended test sequence, where the timestamp of writing the appended sequence + // is assured to be larger than the timestamp of the original sequence. + final int maxRetries = 3; + int attempt = 0; + while (attempt != maxRetries) { + firstTimestamp = System.currentTimeMillis(); + topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1); --- End diff -- Ah, I just thought that we could have a simple loop there: ``` long secondTimestamp = System.currentTimeMillis(); while (secondTimestamp <= firstTimestamp) { Thread.sleep(); secondTimestamp = System.currentTimeMillis(); } ``` what do you think? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no
[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r170966200 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws Exception { partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 - readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + /** +* This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to +* start from timestamp. +* +* The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition. +* After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded. +* For the validation, when the read job is configured to start from the first timestamp, each partition should start +* from offset 0 and read a total of 80 records. When configured to start from the second timestamp, +* each partition should start from offset 50 and read on the remaining 30 appended records. +*/ + public void runStartFromTimestamp() throws Exception { + // 4 partitions with 50 records each + final int parallelism = 4; + final int initialRecordsInEachPartition = 50; + final int appendRecordsInEachPartition = 30; + + long firstTimestamp = 0; + long secondTimestamp = 0; + String topic = ""; + + // attempt to create an appended test sequence, where the timestamp of writing the appended sequence + // is assured to be larger than the timestamp of the original sequence. + final int maxRetries = 3; + int attempt = 0; + while (attempt != maxRetries) { + firstTimestamp = System.currentTimeMillis(); + topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1); --- End diff -- Ah, I just thought that we could have a simple loop there: ``` long secondTimestamp = System.currentTimeMillis(); while (secondTimestamp <= firstTimestamp) { Thread.sleep(); secondTimestamp = System.currentTimeMillis(); } ``` what do you think? ---
[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover
[ https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378796#comment-16378796 ] ASF GitHub Bot commented on FLINK-8777: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - MaptoDiscard = new HashMap<>(16); + Map.Entry toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry (checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry (checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator > entryIterator = --- End diff -- addressed. > improve resource release when recovery from failover > > > Key: FLINK-8777 > URL: https://issues.apache.org/jira/browse/FLINK-8777 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} > will be invoked, we can release all entry from > {{storedTaskStateByCheckpointID}} that does not satisfy {{entry.checkpointID > == checkpointID}}, this can prevent the resource leak when job loop in > {{local checkpoint completed => failed => local checkpoint completed => > failed ...}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)