[GitHub] flink pull request #5074: [FLINK-7873] [runtime] Introduce local recovery

2018-02-27 Thread sihuazhou
Github user sihuazhou closed the pull request at:

https://github.com/apache/flink/pull/5074


---


[jira] [Commented] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379918#comment-16379918
 ] 

ASF GitHub Bot commented on FLINK-7873:
---

Github user sihuazhou closed the pull request at:

https://github.com/apache/flink/pull/5074


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

2018-02-27 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-7873.
-
  Resolution: Duplicate
Release Note: fixed by stefan.

> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> -
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8160) Extend OperatorHarness to expose metrics

2018-02-27 Thread Tuo Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tuo Wang reassigned FLINK-8160:
---

Assignee: Tuo Wang

> Extend OperatorHarness to expose metrics
> 
>
> Key: FLINK-8160
> URL: https://issues.apache.org/jira/browse/FLINK-8160
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Streaming
>Reporter: Chesnay Schepler
>Assignee: Tuo Wang
>Priority: Major
> Fix For: 1.5.0
>
>
> To better test interactions between operators and metrics the harness should 
> expose the metrics registered by the operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Description: 
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..

  was:
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 _part-28-0.in-progress

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..


> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-02-27 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8794:
--
Description: 
When using BucketingSink, it happens that one of the files is always in the 
[.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.

 
{code:java}
// code placeholder
{code}
2018-02-28 11:58:42  147341619 _part-28-0.in-progress

2018-02-28 12:06:27  147315059 part-0-0

2018-02-28 12:06:27  147462359 part-1-0

2018-02-28 12:06:27  147316006 part-10-0

2018-02-28 12:06:28  147349854 part-100-0

2018-02-28 12:06:27  147421625 part-101-0

2018-02-28 12:06:27  147443830 part-102-0

2018-02-28 12:06:27  147372801 part-103-0

2018-02-28 12:06:27  147343670 part-104-0

..

  was:When using BucketingSink, it happens that one of the files is always in 
the [.in-progress] state. And this state has never changed after that.  The 
underlying use of S3 as storage.


> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 _part-28-0.in-progress
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-02-27 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815
 ] 

Thomas Weise edited comment on FLINK-5697 at 2/28/18 5:52 AM:
--

We have implemented periodic watermark support in a customization of 
FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance 
of AssignerWithPeriodicWatermarks and uses the configuration from 
ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also 
addresses the issue described in FLINK-5479 with an (optional) interval 
property for the user to specify after how much time since the last record a 
shard is considered idle and should not hold back the watermark. If there is 
interest, I would contribute these changes to the current Flink Kinesis 
connector. 


was (Author: thw):
We have implemented periodic watermark support in a customization of 
FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance 
of AssignerWithPeriodicWatermarks and uses the configuration from 
ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also 
addresses the issue described in FLINK-5479 with an (optional) interval 
property for the user to specify after how much time since the last a shard is 
considered idle and should not hold back the watermark. If there is interest, I 
would contribute these changes to the current Flink Kinesis connector. 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5697) Add per-shard watermarks for FlinkKinesisConsumer

2018-02-27 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379815#comment-16379815
 ] 

Thomas Weise commented on FLINK-5697:
-

We have implemented periodic watermark support in a customization of 
FlinkKinesisConsumer via FLINK-8648. The consumer accepts an optional instance 
of AssignerWithPeriodicWatermarks and uses the configuration from 
ExecutionConfig#setAutoWatermarkInterval for the timer interval. It also 
addresses the issue described in FLINK-5479 with an (optional) interval 
property for the user to specify after how much time since the last a shard is 
considered idle and should not hold back the watermark. If there is interest, I 
would contribute these changes to the current Flink Kinesis connector. 

> Add per-shard watermarks for FlinkKinesisConsumer
> -
>
> Key: FLINK-5697
> URL: https://issues.apache.org/jira/browse/FLINK-5697
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> It would be nice to let the Kinesis consumer be on-par in functionality with 
> the Kafka consumer, since they share very similar abstractions. Per-partition 
> / shard watermarks is something we can add also to the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5578
  
@StefanRRichter I have addressed your suggestions, except the one that to 
make the `if` a bit complex, instead I introduced a `Predicate` for the 
`pruneCheckpoints()`. I not sure whether it is ok to you, if you still against 
doing so, I'd like to change the code as to make the `if` a bit complex.


---


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379806#comment-16379806
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5578
  
@StefanRRichter I have addressed your suggestions, except the one that to 
make the `if` a bit complex, instead I introduced a `Predicate` for the 
`pruneCheckpoints()`. I not sure whether it is ok to you, if you still against 
doing so, I'd like to change the code as to make the `if` a bit complex.


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379780#comment-16379780
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
@tillrohrmann could you review this PR? Thanks!


> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8792) bad semantic of method name MessageQueryParameter.convertStringToValue

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379779#comment-16379779
 ] 

ASF GitHub Bot commented on FLINK-8792:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5587
  
@tillrohrmann could you review this PR? Thanks!


>  bad semantic of method name MessageQueryParameter.convertStringToValue 
> 
>
> Key: FLINK-8792
> URL: https://issues.apache.org/jira/browse/FLINK-8792
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> the method name
> {code:java}
> MessageQueryParameter.convertStringToValue
> {code}
> should be 
> {code:java}
> convertValueToString
> {code}
> or
> {code:java}
> convertStringFromValue{code}
>  I think 
> {code:java}
> convertValueToString
> {code}
> would be better.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5593: [FLINK-7836][Client] specifying node label for flink job ...

2018-02-27 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5593
  
@tillrohrmann could you review this PR? Thanks!


---


[GitHub] flink issue #5587: [FLINK-8792][REST] bad semantic of method name MessageQue...

2018-02-27 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5587
  
@tillrohrmann could you review this PR? Thanks!


---


[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-02-27 Thread Wind (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379777#comment-16379777
 ] 

Wind commented on FLINK-6847:
-

can you assign it to me if possible ?

> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-02-27 Thread Wind (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wind updated FLINK-6847:

Comment: was deleted

(was: can you assign it to me if possible ?)

> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: starter
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7470) Acquire RM leadership before registering with Mesos

2018-02-27 Thread Bertrand Bossy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379756#comment-16379756
 ] 

Bertrand Bossy commented on FLINK-7470:
---

Is there any update on this? FLIP-6 mode for Mesos seems to have severe issues 
due to that.

> Acquire RM leadership before registering with Mesos
> ---
>
> Key: FLINK-7470
> URL: https://issues.apache.org/jira/browse/FLINK-7470
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Priority: Major
>
> Mesos doesn't support fencing tokens in the scheduler protocol; it assumes 
> external leader election among scheduler instances.   The last connection 
> wins; prior connections for a given framework ID are closed.
> The Mesos RM should not register as a framework until it has acquired RM 
> leadership.   Evolve the ResourceManager as necessary.   One option is to 
> introduce an ResourceManagerRunner that acquires leadership before starting 
> the RM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7836) specifying node label for flink job to run on yarn

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379722#comment-16379722
 ] 

ASF GitHub Bot commented on FLINK-7836:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5593

[FLINK-7836][Client] specifying node label for flink job to run on yarn

## What is the purpose of the change

*This pull request support flink on yarn to specify yarn node label for 
yarn flink application both flink session and single job mode*


## Brief change log

  - *define a new command option to specify a yarn node label*
  - *set the node label for the application through reflection (to support 
low version hadoop)*
  - *add command line option description*


## Verifying this change

This change without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-7836

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5593


commit 5f7342716223adea6ec1745d319a44a2bd3188da
Author: vinoyang 
Date:   2018-02-28T03:02:07Z

[FLINK-7836][Client] specifying node label for flink job to run on yarn




> specifying node label for flink job to run on yarn
> --
>
> Key: FLINK-7836
> URL: https://issues.apache.org/jira/browse/FLINK-7836
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.3.2
>Reporter: zhaibaba
>Assignee: vinoyang
>Priority: Major
>
> flink client cannot specify node label for flink job to run on yarn



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5593: [FLINK-7836][Client] specifying node label for fli...

2018-02-27 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5593

[FLINK-7836][Client] specifying node label for flink job to run on yarn

## What is the purpose of the change

*This pull request support flink on yarn to specify yarn node label for 
yarn flink application both flink session and single job mode*


## Brief change log

  - *define a new command option to specify a yarn node label*
  - *set the node label for the application through reflection (to support 
low version hadoop)*
  - *add command line option description*


## Verifying this change

This change without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-7836

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5593


commit 5f7342716223adea6ec1745d319a44a2bd3188da
Author: vinoyang 
Date:   2018-02-28T03:02:07Z

[FLINK-7836][Client] specifying node label for flink job to run on yarn




---


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379717#comment-16379717
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171133066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

I agree with you that the breaking case looks a bit dangerous ... I think 
maybe we could pass a `Predicate` for the `if` and let the caller side pass the 
`Predicate` into this function. This could make it cleaner from the caller side 
and don't need to mass the logic into the `if` to make it complex.


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171133066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

I agree with you that the breaking case looks a bit dangerous ... I think 
maybe we could pass a `Predicate` for the `if` and let the caller side pass the 
`Predicate` into this function. This could make it cleaner from the caller side 
and don't need to mass the logic into the `if` to make it complex.


---


[GitHub] flink pull request #5592: [hotfix] fix javadoc link of ClusterClient#trigger...

2018-02-27 Thread Matrix42
GitHub user Matrix42 opened a pull request:

https://github.com/apache/flink/pull/5592

[hotfix] fix javadoc link of ClusterClient#triggerSavepoint

## What is the purpose of the change

fix javadoc link of ClusterClient#triggerSavepoint

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Matrix42/flink doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5592.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5592


commit 09691377720361cd2d05007371dbc579aa876bc0
Author: Matrix42 <934336389@...>
Date:   2018-02-28T02:51:42Z

[hotfix] fix javadoc link of ClusterClient#triggerSavepoint




---


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379697#comment-16379697
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

 


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379696#comment-16379696
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130718
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Aha, this is just for passing the existing test case in 
`TaskLocalStateStoreImplTest` ... 
```java
private void checkStoredAsExpected(List history, int 
off, int len) throws Exception {
for (int i = off; i < len; ++i) {
TaskStateSnapshot expected = history.get(i);
Assert.assertTrue(expected == 
taskLocalStateStore.retrieveLocalState(i));
Mockito.verify(expected, 
Mockito.never()).discardState();
}
}
```


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

👍 


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379698#comment-16379698
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

 


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130718
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Aha, this is just for passing the existing test case in 
`TaskLocalStateStoreImplTest` ... 
```java
private void checkStoredAsExpected(List history, int 
off, int len) throws Exception {
for (int i = off; i < len; ++i) {
TaskStateSnapshot expected = history.get(i);
Assert.assertTrue(expected == 
taskLocalStateStore.retrieveLocalState(i));
Mockito.verify(expected, 
Mockito.never()).discardState();
}
}
```


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379673#comment-16379673
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol All right, got your point. That's a problem indeed. 


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-02-27 Thread ifndef-SleePy
Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol All right, got your point. That's a problem indeed. 


---


[jira] [Updated] (FLINK-8708) Unintended integer division in StandaloneThreadedGenerator

2018-02-27 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8708:
--
Description: 
In 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
 :

{code}
double factor = (ts - lastTimeStamp) / 1000;
{code}
Proper casting should be done before the integer division

  was:
In 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
 :
{code}
double factor = (ts - lastTimeStamp) / 1000;
{code}
Proper casting should be done before the integer division


> Unintended integer division in StandaloneThreadedGenerator
> --
>
> Key: FLINK-8708
> URL: https://issues.apache.org/jira/browse/FLINK-8708
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In 
> flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/StandaloneThreadedGenerator.java
>  :
> {code}
> double factor = (ts - lastTimeStamp) / 1000;
> {code}
> Proper casting should be done before the integer division



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8720) Logging exception with S3 connector and BucketingSink

2018-02-27 Thread dejan miljkovic (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379167#comment-16379167
 ] 

dejan miljkovic commented on FLINK-8720:


I see from FLINK-8798 that solution is going to be provided from 1.4.2. Do you 
know when 1.4.2 is going to be released.

> Logging exception with S3 connector and BucketingSink
> -
>
> Key: FLINK-8720
> URL: https://issues.apache.org/jira/browse/FLINK-8720
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.1
>Reporter: dejan miljkovic
>Priority: Critical
>
> Trying to stream data to S3. Code works from InteliJ. When submitting code 
> trough UI on my machine (single node cluster started by start-cluster.sh 
> script) below stack trace is produced.
>  
> Below is the link to the simple test app that is streaming data to S3. 
> [https://github.com/dmiljkovic/test-flink-bucketingsink-s3]
> The behavior is bit different but same error is produced.  Job works only 
> once. If job is submitted second time below stack trace is produced. If I 
> restart the cluster job works but only for the first time.
>  
>  
> org.apache.commons.logging.LogConfigurationException: 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3 (Caused by 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
>   at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96)
>   at 
> com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
>   at 
> com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
>   at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158)
>   at 
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
>   ... 26 more
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8538.
-
   Resolution: Fixed
Fix Version/s: 1.6.0
   1.5.0

Fixed in 1.6.0: 2450d2b24006a4db846b9b688f9b598e3fdf7c6e & 
d9f2f2f83c8a099e687e158705375db1b56dca01
Fixed in 1.5.0: 1d26062de130c05fdbe7701b55766b4a8d433418 & 
db2c510fb4f171c9e9940759e5fbaf466ec74474

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379148#comment-16379148
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5564


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5564


---


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379074#comment-16379074
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@tillrohrmann  @kl0u Thanks for reviewing, guys 

As @pnowojski mentioned, we three decided to expose timer keys in 
`ProcessFunction` in [FLINK-8560](https://github.com/apache/flink/pull/5481).  
Exposing timer keys in `KeyedBroadcastProcessFunction` extends that design. I 
think we should get this PR into 1.5.0 so we don't need to do the [complicated 
refactoring for FLINK-8560](https://github.com/apache/flink/pull/5481) to 
support backward compatibility 


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@tillrohrmann  @kl0u Thanks for reviewing, guys 

As @pnowojski mentioned, we three decided to expose timer keys in 
`ProcessFunction` in [FLINK-8560](https://github.com/apache/flink/pull/5481).  
Exposing timer keys in `KeyedBroadcastProcessFunction` extends that design. I 
think we should get this PR into 1.5.0 so we don't need to do the [complicated 
refactoring for FLINK-8560](https://github.com/apache/flink/pull/5481) to 
support backward compatibility 


---


[jira] [Commented] (FLINK-8689) Add runtime support of distinct filter using MapView

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379063#comment-16379063
 ] 

ASF GitHub Bot commented on FLINK-8689:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
I agree this design is much cleaner and easier to maintain later. I was 
hesitating to change the function signature of `generateAggregations()`.

I will try to introduce some common building blocks and handle 
implementation effort in codegen. I will imagine a lot of ITCases will be 
needed :-)

Thanks for the feedback @hequn8128 @fhueske 


> Add runtime support of distinct filter using MapView 
> -
>
> Key: FLINK-8689
> URL: https://issues.apache.org/jira/browse/FLINK-8689
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5555: [FLINK-8689][table]Add runtime support of distinct filter...

2018-02-27 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/
  
I agree this design is much cleaner and easier to maintain later. I was 
hesitating to change the function signature of `generateAggregations()`.

I will try to introduce some common building blocks and handle 
implementation effort in codegen. I will imagine a lot of ITCases will be 
needed :-)

Thanks for the feedback @hequn8128 @fhueske 


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379044#comment-16379044
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thank you @fhueske. Will merge...


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...

2018-02-27 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thank you @fhueske. Will merge...


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379026#comment-16379026
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thanks for the update. 
I think this is good to merge.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...

2018-02-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5564
  
Thanks for the update. 
I think this is good to merge.


---


[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378993#comment-16378993
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001535
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
--- End diff --

You should also add a comment, that checking for custom events is not 
supported.


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378989#comment-16378989
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, 
Class eventClass, ClassLoad
try {
int type = buffer.getInt();
 
-   switch (type) {
-   case END_OF_PARTITION_EVENT:
-   return 
eventClass.equals(EndOfPartitionEvent.class);
-   case CHECKPOINT_BARRIER_EVENT:
-   return 
eventClass.equals(CheckpointBarrier.class);
-   case END_OF_SUPERSTEP_EVENT:
-   return 
eventClass.equals(EndOfSuperstepEvent.class);
-   case CANCEL_CHECKPOINT_MARKER_EVENT:
-   return 
eventClass.equals(CancelCheckpointMarker.class);
-   case OTHER_EVENT:
-   try {
-   final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-   final String className = 
deserializer.readUTF();
-
-   final Class clazz;
-   try {
-   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
-   }
-   catch (ClassNotFoundException 
e) {
-   throw new 
IOException("Could not load event class '" + className + "'.", e);
-   }
-   catch (ClassCastException e) {
-   throw new 
IOException("The class '" + className + "' is not a valid subclass of '"
-   + 
AbstractEvent.class.getName() + "'.", e);
-   }
-   return eventClass.equals(clazz);
-   }
-   catch (Exception e) {
-   throw new IOException("Error 
while deserializing or instantiating event.", e);
-   }
-   default:
-   throw new IOException("Corrupt byte 
stream for event");
+   if (eventClass.equals(EndOfPartitionEvent.class)) {
+   return type == END_OF_PARTITION_EVENT;
+   } else if (eventClass.equals(CheckpointBarrier.class)) {
+   return type == CHECKPOINT_BARRIER_EVENT;
+   } else if 
(eventClass.equals(EndOfSuperstepEvent.class)) {
+   return type == END_OF_SUPERSTEP_EVENT;
+   } else if 
(eventClass.equals(CancelCheckpointMarker.class)) {
+   return type == CANCEL_CHECKPOINT_MARKER_EVENT;
+   } else {
+   throw new IOException("Corrupt byte stream for 
event or unsupported eventClass = " + eventClass);
--- End diff --

Actually, this should be an `UnsupportedOperationException` since this is 
only based on the class being given and not the input stream.


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> 

[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378992#comment-16378992
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171000868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
--- End diff --

this change qualifies for a separate JIRA ticket, not just a hotfix


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378994#comment-16378994
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171004092
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 ---
@@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
}
 
/**
-* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} 
returns
+* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
 * the correct answer for various encoded event buffers.
 */
@Test
public void testIsEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
-   EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
-   new CancelCheckpointMarker(287087987329842L)
+   new CancelCheckpointMarker(287087987329842L),
+   EndOfSuperstepEvent.INSTANCE
+   };
+
+   Class[] expectedClasses = {
+   EndOfPartitionEvent.class,
+   CheckpointBarrier.class,
+   CancelCheckpointMarker.class,
+   EndOfSuperstepEvent.class
--- End diff --

This extra array seems a bit error-prone and requires maintenance in case 
the events are extended - wouldn't it be equally clear if we used your new 
naming with the original array?


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171004092
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 ---
@@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
}
 
/**
-* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} 
returns
+* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
 * the correct answer for various encoded event buffers.
 */
@Test
public void testIsEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
-   EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
-   new CancelCheckpointMarker(287087987329842L)
+   new CancelCheckpointMarker(287087987329842L),
+   EndOfSuperstepEvent.INSTANCE
+   };
+
+   Class[] expectedClasses = {
+   EndOfPartitionEvent.class,
+   CheckpointBarrier.class,
+   CancelCheckpointMarker.class,
+   EndOfSuperstepEvent.class
--- End diff --

This extra array seems a bit error-prone and requires maintenance in case 
the events are extended - wouldn't it be equally clear if we used your new 
naming with the original array?


---


[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378991#comment-16378991
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171002283
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 ---
@@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
}
 
/**
-* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} 
returns
+* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
 * the correct answer for various encoded event buffers.
 */
@Test
public void testIsEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
-   EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
-   new CancelCheckpointMarker(287087987329842L)
+   new CancelCheckpointMarker(287087987329842L),
+   EndOfSuperstepEvent.INSTANCE
+   };
--- End diff --

I wonder why the order of the events changed?


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378990#comment-16378990
 ] 

ASF GitHub Bot commented on FLINK-8750:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -318,13 +295,9 @@ public static AbstractEvent fromBuffer(Buffer buffer, 
ClassLoader classLoader) t
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   public static boolean isEvent(final Buffer buffer,
-   final Class eventClass,
-   final ClassLoader classLoader) throws IOException {
-   return !buffer.isBuffer() &&
-   isEvent(buffer.getNioBufferReadable(), eventClass, 
classLoader);
+   public static boolean isEvent(Buffer buffer, Class eventClass) 
throws IOException {
+   return !buffer.isBuffer() && 
isEvent(buffer.getNioBufferReadable(), eventClass);
--- End diff --

similar here: add a comment, that checking for custom events is not 
supported anymore


> InputGate may contain data after an EndOfPartitionEvent
> ---
>
> Key: FLINK-8750
> URL: https://issues.apache.org/jira/browse/FLINK-8750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001535
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
--- End diff --

You should also add a comment, that checking for custom events is not 
supported.


---


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -318,13 +295,9 @@ public static AbstractEvent fromBuffer(Buffer buffer, 
ClassLoader classLoader) t
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   public static boolean isEvent(final Buffer buffer,
-   final Class eventClass,
-   final ClassLoader classLoader) throws IOException {
-   return !buffer.isBuffer() &&
-   isEvent(buffer.getNioBufferReadable(), eventClass, 
classLoader);
+   public static boolean isEvent(Buffer buffer, Class eventClass) 
throws IOException {
+   return !buffer.isBuffer() && 
isEvent(buffer.getNioBufferReadable(), eventClass);
--- End diff --

similar here: add a comment, that checking for custom events is not 
supported anymore


---


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171002283
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 ---
@@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
}
 
/**
-* Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} 
returns
+* Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
 * the correct answer for various encoded event buffers.
 */
@Test
public void testIsEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
-   EndOfSuperstepEvent.INSTANCE,
new CheckpointBarrier(1678L, 4623784L, 
CheckpointOptions.forCheckpointWithDefaultLocation()),
new TestTaskEvent(Math.random(), 12361231273L),
-   new CancelCheckpointMarker(287087987329842L)
+   new CancelCheckpointMarker(287087987329842L),
+   EndOfSuperstepEvent.INSTANCE
+   };
--- End diff --

I wonder why the order of the events changed?


---


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171001301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, 
Class eventClass, ClassLoad
try {
int type = buffer.getInt();
 
-   switch (type) {
-   case END_OF_PARTITION_EVENT:
-   return 
eventClass.equals(EndOfPartitionEvent.class);
-   case CHECKPOINT_BARRIER_EVENT:
-   return 
eventClass.equals(CheckpointBarrier.class);
-   case END_OF_SUPERSTEP_EVENT:
-   return 
eventClass.equals(EndOfSuperstepEvent.class);
-   case CANCEL_CHECKPOINT_MARKER_EVENT:
-   return 
eventClass.equals(CancelCheckpointMarker.class);
-   case OTHER_EVENT:
-   try {
-   final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-   final String className = 
deserializer.readUTF();
-
-   final Class clazz;
-   try {
-   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
-   }
-   catch (ClassNotFoundException 
e) {
-   throw new 
IOException("Could not load event class '" + className + "'.", e);
-   }
-   catch (ClassCastException e) {
-   throw new 
IOException("The class '" + className + "' is not a valid subclass of '"
-   + 
AbstractEvent.class.getName() + "'.", e);
-   }
-   return eventClass.equals(clazz);
-   }
-   catch (Exception e) {
-   throw new IOException("Error 
while deserializing or instantiating event.", e);
-   }
-   default:
-   throw new IOException("Corrupt byte 
stream for event");
+   if (eventClass.equals(EndOfPartitionEvent.class)) {
+   return type == END_OF_PARTITION_EVENT;
+   } else if (eventClass.equals(CheckpointBarrier.class)) {
+   return type == CHECKPOINT_BARRIER_EVENT;
+   } else if 
(eventClass.equals(EndOfSuperstepEvent.class)) {
+   return type == END_OF_SUPERSTEP_EVENT;
+   } else if 
(eventClass.equals(CancelCheckpointMarker.class)) {
+   return type == CANCEL_CHECKPOINT_MARKER_EVENT;
+   } else {
+   throw new IOException("Corrupt byte stream for 
event or unsupported eventClass = " + eventClass);
--- End diff --

Actually, this should be an `UnsupportedOperationException` since this is 
only based on the class being given and not the input stream.


---


[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5588#discussion_r171000868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
@@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
 *
 * @param buffer the buffer to peak into
 * @param eventClass the expected class of the event type
-* @param classLoader the class loader to use for custom event classes
 * @return whether the event class of the buffer matches the 
given eventClass
 */
-   private static boolean isEvent(ByteBuffer buffer, Class eventClass, 
ClassLoader classLoader) throws IOException {
+   private static boolean isEvent(ByteBuffer buffer, Class eventClass) 
throws IOException {
--- End diff --

this change qualifies for a separate JIRA ticket, not just a hotfix


---


[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378965#comment-16378965
 ] 

ASF GitHub Bot commented on FLINK-8755:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5581
  
rebased (not including newest FLINK-8694 changes!) and addressed the 
comments


> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in spilled...

2018-02-27 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5581
  
rebased (not including newest FLINK-8694 changes!) and addressed the 
comments


---


[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378940#comment-16378940
 ] 

ASF GitHub Bot commented on FLINK-8755:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170993496
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws 
Exception {
assertEquals(1, listener.getNumNotifications());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   BufferAndBacklog read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
assertEquals(2, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertFalse(read.nextBufferIsEvent());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertTrue(read.nextBufferIsEvent());
 
assertTrue(reader.nextBufferIsEvent()); // event
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertFalse(read.buffer().isBuffer());
+   assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, 
true);
--- End diff --

almost - it remains `@Nullable` in 
`SubpartitionTestBase#assertNextBufferOrEvent`


> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170993496
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -222,59 +219,24 @@ public void testConsumeSpilledPartition() throws 
Exception {
assertEquals(1, listener.getNumNotifications());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   BufferAndBacklog read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
assertEquals(2, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertFalse(read.nextBufferIsEvent());
 
assertFalse(reader.nextBufferIsEvent()); // buffer
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertTrue(read.buffer().isBuffer());
+   assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
-   assertEquals(partition.getBuffersInBacklog(), 
read.buffersInBacklog());
-   assertFalse(read.buffer().isRecycled());
-   read.buffer().recycleBuffer();
-   assertTrue(read.buffer().isRecycled());
-   assertTrue(read.nextBufferIsEvent());
 
assertTrue(reader.nextBufferIsEvent()); // event
-   read = reader.getNextBuffer();
-   assertNotNull(read);
-   assertFalse(read.buffer().isBuffer());
+   assertNextEvent(reader, BUFFER_DATA_SIZE, null, true, 1, false, 
true);
--- End diff --

almost - it remains `@Nullable` in 
`SubpartitionTestBase#assertNextBufferOrEvent`


---


[jira] [Commented] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378926#comment-16378926
 ] 

ASF GitHub Bot commented on FLINK-8755:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170990471
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
-   int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   true,
+   null,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   static void assertNextEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   @Nullable Class 
expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   false,
+   expectedEventClass,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   private static void assertNextBufferOrEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   boolean expectedIsBuffer,
+   Class expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
-   assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
-   assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
-   assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+   assertNotNull(bufferAndBacklog);
+
+   assertEquals("buffer size", expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
--- End diff --

Unfortunately yes:
- with the string:
```
java.lang.AssertionError: buffer size 
Expected :1025
Actual   :1024
```
- without the string:
```
java.lang.AssertionError: 
Expected :1025
Actual   :1024
```
It is even worse for boolean values as you may imagine. This way, you can 
immediately get to your test to fix the assumption and do not have to click 
into `SubpartitionTestBase` to identify what was actually wrong.



> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their 

[GitHub] flink pull request #5581: [FLINK-8755][FLINK-8786][network] fix two bugs in ...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5581#discussion_r170990471
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -138,11 +145,68 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
-   int expectedBuffersInBacklog) throws IOException, 
InterruptedException {
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   true,
+   null,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   static void assertNextEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   @Nullable Class 
expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   assertNextBufferOrEvent(
+   readView,
+   expectedReadableBufferSize,
+   false,
+   expectedEventClass,
+   expectedIsMoreAvailable,
+   expectedBuffersInBacklog,
+   expectedNextBufferIsEvent,
+   expectedRecycledAfterRecycle);
+   }
+
+   private static void assertNextBufferOrEvent(
+   ResultSubpartitionView readView,
+   int expectedReadableBufferSize,
+   boolean expectedIsBuffer,
+   Class expectedEventClass,
+   boolean expectedIsMoreAvailable,
+   int expectedBuffersInBacklog,
+   boolean expectedNextBufferIsEvent,
+   boolean expectedRecycledAfterRecycle) throws 
IOException, InterruptedException {
+   checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog = 
readView.getNextBuffer();
-   assertEquals(expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
-   assertEquals(expectedIsMoreAvailable, 
bufferAndBacklog.isMoreAvailable());
-   assertEquals(expectedBuffersInBacklog, 
bufferAndBacklog.buffersInBacklog());
+   assertNotNull(bufferAndBacklog);
+
+   assertEquals("buffer size", expectedReadableBufferSize, 
bufferAndBacklog.buffer().readableBytes());
--- End diff --

Unfortunately yes:
- with the string:
```
java.lang.AssertionError: buffer size 
Expected :1025
Actual   :1024
```
- without the string:
```
java.lang.AssertionError: 
Expected :1025
Actual   :1024
```
It is even worse for boolean values as you may imagine. This way, you can 
immediately get to your test to fix the assumption and do not have to click 
into `SubpartitionTestBase` to identify what was actually wrong.



---


[jira] [Commented] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378912#comment-16378912
 ] 

ASF GitHub Bot commented on FLINK-8737:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5583#discussion_r170988543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -61,7 +61,7 @@
  * ++
  * 
  *
- * It is possible to recursively union union input gates.
+ * It is NOT possible to recursively union union input 
gates.
--- End diff --

That would be a more invasive change in several places using `InputGate`s 
and we actually are able to use any other input gate types (if there were some) 
- just the recursive union does not make sense. So I guess, excluding ourself 
is the better approach here in terms of extensibility. Think of it as we're 
excluding this use not because we did not implement `pollNextBuffer` but 
because it does not make sense and we do not want this recursion.


> Creating a union of UnionGate instances will fail with 
> UnsupportedOperationException when retrieving buffers
> 
>
> Key: FLINK-8737
> URL: https://issues.apache.org/jira/browse/FLINK-8737
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> FLINK-8589 introduced a new polling method but did not implement 
> {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances 
> from containing a UnionGate instance which is explicitly allowed by its 
> documentation and interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5583: [FLINK-8737][network] disallow creating a union of...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5583#discussion_r170988543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -61,7 +61,7 @@
  * ++
  * 
  *
- * It is possible to recursively union union input gates.
+ * It is NOT possible to recursively union union input 
gates.
--- End diff --

That would be a more invasive change in several places using `InputGate`s 
and we actually are able to use any other input gate types (if there were some) 
- just the recursive union does not make sense. So I guess, excluding ourself 
is the better approach here in terms of extensibility. Think of it as we're 
excluding this use not because we did not implement `pollNextBuffer` but 
because it does not make sense and we do not want this recursion.


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378910#comment-16378910
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua I will take a look this week.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-02-27 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5573
  
@yanghua I will take a look this week.


---


[jira] [Commented] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378907#comment-16378907
 ] 

ASF GitHub Bot commented on FLINK-8737:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5583#discussion_r170987211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -189,11 +189,11 @@ public void requestPartitions() throws IOException, 
InterruptedException {
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return Optional.ofNullable(bufferOrEvent);
+   return Optional.of(bufferOrEvent);
}
 
@Override
-   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
--- End diff --

we don't need to - I thought this makes it more explicit if anyone tries to 
use this method in an IDE without looking at the code (which is a mistake imho, 
but commonly done)


> Creating a union of UnionGate instances will fail with 
> UnsupportedOperationException when retrieving buffers
> 
>
> Key: FLINK-8737
> URL: https://issues.apache.org/jira/browse/FLINK-8737
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> FLINK-8589 introduced a new polling method but did not implement 
> {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances 
> from containing a UnionGate instance which is explicitly allowed by its 
> documentation and interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5583: [FLINK-8737][network] disallow creating a union of...

2018-02-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/5583#discussion_r170987211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
@@ -189,11 +189,11 @@ public void requestPartitions() throws IOException, 
InterruptedException {
 
bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
 
-   return Optional.ofNullable(bufferOrEvent);
+   return Optional.of(bufferOrEvent);
}
 
@Override
-   public Optional pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
--- End diff --

we don't need to - I thought this makes it more explicit if anyone tries to 
use this method in an IDE without looking at the code (which is a mistake imho, 
but commonly done)


---


[jira] [Assigned] (FLINK-8336) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability

2018-02-27 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-8336:
--

Assignee: Nico Kruber

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3 test instability
> ---
>
> Key: FLINK-8336
> URL: https://issues.apache.org/jira/browse/FLINK-8336
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3}} fails on 
> Travis. I suspect that this has something to do with the consistency 
> guarantees S3 gives us.
> https://travis-ci.org/tillrohrmann/flink/jobs/323930297



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378890#comment-16378890
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Why do we need this? Is there any case for not doing the cleanup?


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378891#comment-16378891
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170982734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

After a second though, while I think this code is currently correct, the 
case with breaking looks a bit dangerous. Potentially, if the checkpoint id is 
not there, this would not stop and prune ongoing checkpoints. I wonder if we 
should make the `if` a bit more complex, but safer (checking that the breaking 
case never exceeds the checkpoint id). What do you think?


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1637#comment-1637
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

I suggest to add an assert that the thread holds `lock` and document that 
this method should be called only when holding the lock.


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

Comment is no longer required.


---


[jira] [Assigned] (FLINK-8408) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis

2018-02-27 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-8408:
--

Assignee: Nico Kruber

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis
> --
>
> Key: FLINK-8408
> URL: https://issues.apache.org/jira/browse/FLINK-8408
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n}} is unstable 
> on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327216460



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378889#comment-16378889
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

Comment is no longer required.


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

I suggest to add an assert that the thread holds `lock` and document that 
this method should be called only when holding the lock.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Why do we need this? Is there any case for not doing the cleanup?


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170982734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

After a second though, while I think this code is currently correct, the 
case with breaking looks a bit dangerous. Potentially, if the checkpoint id is 
not there, this would not stop and prune ongoing checkpoints. I wonder if we 
should make the `if` a bit more complex, but safer (checking that the breaking 
case never exceeds the checkpoint id). What do you think?


---


[jira] [Updated] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2018-02-27 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8801:
---
Fix Version/s: 1.4.3

> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, ResourceManager, YARN
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2018-02-27 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8801:
--

 Summary: S3's eventual consistent read-after-write may fail yarn 
deployment of resources to S3
 Key: FLINK-8801
 URL: https://issues.apache.org/jira/browse/FLINK-8801
 Project: Flink
  Issue Type: Bug
  Components: FileSystem, ResourceManager, YARN
Affects Versions: 1.4.0, 1.5.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.0


According to 
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:

{quote}
Amazon S3 provides read-after-write consistency for PUTS of new objects in your 
S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD 
or GET request to the key name (to find if the object exists) before creating 
the object, Amazon S3 provides eventual consistency for read-after-write.
{quote}

Some S3 file system implementations may actually execute such a request for the 
about-to-write object and thus the read-after-write is only eventually 
consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
relies on a consistent read-after-write since it accesses the remote resource 
to get file size and modification timestamp. Since there we have access to the 
local resource, we can use the data from there instead and circumvent the 
problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8798) Make commons-logging a parent-first pattern

2018-02-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8798.
---

> Make commons-logging a parent-first pattern
> ---
>
> Key: FLINK-8798
> URL: https://issues.apache.org/jira/browse/FLINK-8798
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2, 1.6.0
>
>
> The Apache {{commons-logging}} framework does not play well with child-first 
> classloading.
> We need to make this a parent-first pattern.
> As a matter of fact, other frameworks that use inverted classloading (JBoss, 
> Tomcat) use force this library to be always parent-first as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8798) Make commons-logging a parent-first pattern

2018-02-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8798.
-
Resolution: Fixed

Fixed in
  - 1.4.2 via 392cfaaed9380c5ea38b8593d23023925638cbe3
  - 1.5.0 via a269f8519305faff153e84d729873b6f9497bd36
  - 1.6.0 via 59fb56bc8378645b82ee31e1b3bd07e5045a3698


> Make commons-logging a parent-first pattern
> ---
>
> Key: FLINK-8798
> URL: https://issues.apache.org/jira/browse/FLINK-8798
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2, 1.6.0
>
>
> The Apache {{commons-logging}} framework does not play well with child-first 
> classloading.
> We need to make this a parent-first pattern.
> As a matter of fact, other frameworks that use inverted classloading (JBoss, 
> Tomcat) use force this library to be always parent-first as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8787) Deploying FLIP-6 YARN session with HA fails

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378868#comment-16378868
 ] 

ASF GitHub Bot commented on FLINK-8787:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5591

[FLINK-8787][flip6] Deploying FLIP-6 YARN session with HA fails

## What is the purpose of the change

*Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`. There 
is code that relies on side effects on the configuration object. This is a 
quick and dirty solution. In future, the descriptor should be made immutable: 
[FLINK-8799](https://issues.apache.org/jira/browse/FLINK-8799)*

cc: @tillrohrmann 

## Brief change log

  - *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Manually deployed Flink cluster on YARN with HA enabled.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8787-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5591.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5591


commit 012f656705466cea4ba2e037f7e52f46c0a1bf9f
Author: gyao 
Date:   2018-02-27T15:58:53Z

[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor




> Deploying FLIP-6 YARN session with HA fails
> ---
>
> Key: FLINK-8787
> URL: https://issues.apache.org/jira/browse/FLINK-8787
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0
> Environment: emr-5.12.0
> Hadoop distribution: Amazon 2.8.3
> Applications: ZooKeeper 3.4.10
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Starting a YARN session with HA in FLIP-6 mode fails with an exception.
> Commit: 5e3fa4403f518dd6d3fe9970fe8ca55871add7c9
> Command to start YARN session:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> HADOOP_CONF_DIR=/etc/hadoop/conf bin/yarn-session.sh -d -n 1 -s 1 -jm 2048 
> -tm 2048
> {noformat}
> Stacktrace:
> {noformat}
> java.lang.reflect.UndeclaredThrowableException
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1854)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:790)
> Caused by: org.apache.flink.util.FlinkException: Could not write the Yarn 
> connection information.
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:612)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:790)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  ... 2 more
> Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
> Could not retrieve the leader address and leader session ID.
>  at 
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:116)
>  at 
> org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:405)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:589)
>  ... 6 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [6 milliseconds]
>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>  at 

[jira] [Created] (FLINK-8800) Set Logging to TRACE for org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler

2018-02-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8800:
---

 Summary: Set Logging to TRACE for 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler
 Key: FLINK-8800
 URL: https://issues.apache.org/jira/browse/FLINK-8800
 Project: Flink
  Issue Type: Bug
  Components: REST
Reporter: Stephan Ewen
 Fix For: 1.5.0, 1.6.0


When setting the log level to {{DEBUG}}, the logs are swamped with statements 
as below, making it hard to read the debug logs.

{code}
2018-02-22 13:41:04,016 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/ded95c643b42f31cf882a8986207fd30/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,048 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/eec5890dac9c38f66954443809beb5b0/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,052 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/2a964ee72788c82cb7d15e352d9a94f6/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,079 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/1d9c83f6e1879fdbe461aafac16eb8a5/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,085 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/4063620891a151092c5bcedb218870a6/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,094 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/2a751c66e0e32aee2cd8120a1a72a4d6/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,142 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/37ecc85b429bd08d0fd539532055e117/metrics?get=0.currentLowWatermark.
2018-02-22 13:41:04,173 DEBUG 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler  - 
Received request 
/jobs/ec1c9d7a3c413a9523656efa58735009/vertices/20e20298680571979f690d36d1a6db36/metrics?get=0.currentLowWatermark.
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5591: [FLINK-8787][flip6] Deploying FLIP-6 YARN session ...

2018-02-27 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5591

[FLINK-8787][flip6] Deploying FLIP-6 YARN session with HA fails

## What is the purpose of the change

*Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`. There 
is code that relies on side effects on the configuration object. This is a 
quick and dirty solution. In future, the descriptor should be made immutable: 
[FLINK-8799](https://issues.apache.org/jira/browse/FLINK-8799)*

cc: @tillrohrmann 

## Brief change log

  - *Do not copy `flinkConfiguration` in `AbstractYarnClusterDescriptor`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Manually deployed Flink cluster on YARN with HA enabled.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8787-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5591.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5591


commit 012f656705466cea4ba2e037f7e52f46c0a1bf9f
Author: gyao 
Date:   2018-02-27T15:58:53Z

[FLINK-8787][flip6] Do not copy flinkConfiguration in 
AbstractYarnClusterDescriptor




---


[jira] [Resolved] (FLINK-8798) Make commons-logging a parent-first pattern

2018-02-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8798.
-
Resolution: Fixed

Fixed in
  - 1.4.2 via 392cfaaed9380c5ea38b8593d23023925638cbe3

> Make commons-logging a parent-first pattern
> ---
>
> Key: FLINK-8798
> URL: https://issues.apache.org/jira/browse/FLINK-8798
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2, 1.6.0
>
>
> The Apache {{commons-logging}} framework does not play well with child-first 
> classloading.
> We need to make this a parent-first pattern.
> As a matter of fact, other frameworks that use inverted classloading (JBoss, 
> Tomcat) use force this library to be always parent-first as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable

2018-02-27 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8799:
---

 Summary: Make AbstractYarnClusterDescriptor immutable
 Key: FLINK-8799
 URL: https://issues.apache.org/jira/browse/FLINK-8799
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Gary Yao
 Fix For: 1.6.0


{{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its 
internal configuration is modified from different places which makes it 
difficult to reason about the code. For example, it should not be possible to 
modify the {{zookeeperNamespace}} using a setter method. A user of this class 
should be forced to provide all information prior to creating the instance, 
e.g., by passing a {{org.apache.flink.configuration.Configuration}} object.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8798) Make commons-logging a parent-first pattern

2018-02-27 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reopened FLINK-8798:
-

prematurely closed

> Make commons-logging a parent-first pattern
> ---
>
> Key: FLINK-8798
> URL: https://issues.apache.org/jira/browse/FLINK-8798
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.2, 1.6.0
>
>
> The Apache {{commons-logging}} framework does not play well with child-first 
> classloading.
> We need to make this a parent-first pattern.
> As a matter of fact, other frameworks that use inverted classloading (JBoss, 
> Tomcat) use force this library to be always parent-first as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5582: [FLINK-8790][State] Improve performance for recovery from...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5582
  
Thanks for the contribution! We are currently busy with the 1.5 release. I 
will have a closer look at this PR and your other pending JIRAs after the 
release is out.


---


[jira] [Commented] (FLINK-8790) Improve performance for recovery from incremental checkpoint

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378850#comment-16378850
 ] 

ASF GitHub Bot commented on FLINK-8790:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5582
  
Thanks for the contribution! We are currently busy with the 1.5 release. I 
will have a closer look at this PR and your other pending JIRAs after the 
release is out.


> Improve performance for recovery from incremental checkpoint
> 
>
> Key: FLINK-8790
> URL: https://issues.apache.org/jira/browse/FLINK-8790
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When there are multi state handle to be restored, we can improve the 
> performance as follow:
> 1. Choose the best state handle to init the target db
> 2. Use the other state handles to create temp db, and clip the db according 
> to the target key group range (via rocksdb.deleteRange()), this can help use 
> get rid of the `key group check` in 
>  `data insertion loop` and also help us get rid of traversing the useless 
> record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8720) Logging exception with S3 connector and BucketingSink

2018-02-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378844#comment-16378844
 ] 

Stephan Ewen commented on FLINK-8720:
-

The proper fix is coming, tracked in FLINK-8798

> Logging exception with S3 connector and BucketingSink
> -
>
> Key: FLINK-8720
> URL: https://issues.apache.org/jira/browse/FLINK-8720
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.1
>Reporter: dejan miljkovic
>Priority: Critical
>
> Trying to stream data to S3. Code works from InteliJ. When submitting code 
> trough UI on my machine (single node cluster started by start-cluster.sh 
> script) below stack trace is produced.
>  
> Below is the link to the simple test app that is streaming data to S3. 
> [https://github.com/dmiljkovic/test-flink-bucketingsink-s3]
> The behavior is bit different but same error is produced.  Job works only 
> once. If job is submitted second time below stack trace is produced. If I 
> restart the cluster job works but only for the first time.
>  
>  
> org.apache.commons.logging.LogConfigurationException: 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3 (Caused by 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
>   at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96)
>   at 
> com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
>   at 
> com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
>   at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158)
>   at 
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
>   ... 26 more
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8798) Make commons-logging a parent-first pattern

2018-02-27 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8798:
---

 Summary: Make commons-logging a parent-first pattern
 Key: FLINK-8798
 URL: https://issues.apache.org/jira/browse/FLINK-8798
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.1
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0, 1.4.2, 1.6.0


The Apache {{commons-logging}} framework does not play well with child-first 
classloading.

We need to make this a parent-first pattern.

As a matter of fact, other frameworks that use inverted classloading (JBoss, 
Tomcat) use force this library to be always parent-first as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-27 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5500
  
If this security concern is an issue, maybe in that case `getCurrentKey()` 
should take `object-reuse` into account, and if `object-reuse` is set to false, 
make a defensive copy of the key on access?

However @aljoscha was not objecting the general idea in the other PR: 
https://github.com/apache/flink/pull/5481


---


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378819#comment-16378819
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5500
  
If this security concern is an issue, maybe in that case `getCurrentKey()` 
should take `object-reuse` into account, and if `object-reuse` is set to false, 
make a defensive copy of the key on access?

However @aljoscha was not objecting the general idea in the other PR: 
https://github.com/apache/flink/pull/5481


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378818#comment-16378818
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5500
  
If also @aljoscha is ok, I can review this PR and merge it.



> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8360) Implement task-local state recovery

2018-02-27 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-8360.
-
Resolution: Fixed

> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-27 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5500
  
If also @aljoscha is ok, I can review this PR and merge it.



---


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378805#comment-16378805
 ] 

Aljoscha Krettek commented on FLINK-7756:
-

[~shashank734] Not right now. But thanks for providing the example, we're 
having a look at it and are trying to figure out what's going wrong.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378802#comment-16378802
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r170966200
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws 
Exception {
partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 
22)); // partition 2 should read offset 22-49
partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 
0));  // partition 3 should read offset 0-49
 
-   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
+   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, null, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
 
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
 
+   /**
+* This test ensures that the consumer correctly uses user-supplied 
timestamp when explicitly configured to
+* start from timestamp.
+*
+* The validated Kafka data is written in 2 steps: first, an initial 
50 records is written to each partition.
+* After that, another 30 records is appended to each partition. Before 
each step, a timestamp is recorded.
+* For the validation, when the read job is configured to start from 
the first timestamp, each partition should start
+* from offset 0 and read a total of 80 records. When configured to 
start from the second timestamp,
+* each partition should start from offset 50 and read on the remaining 
30 appended records.
+*/
+   public void runStartFromTimestamp() throws Exception {
+   // 4 partitions with 50 records each
+   final int parallelism = 4;
+   final int initialRecordsInEachPartition = 50;
+   final int appendRecordsInEachPartition = 30;
+
+   long firstTimestamp = 0;
+   long secondTimestamp = 0;
+   String topic = "";
+
+   // attempt to create an appended test sequence, where the 
timestamp of writing the appended sequence
+   // is assured to be larger than the timestamp of the original 
sequence.
+   final int maxRetries = 3;
+   int attempt = 0;
+   while (attempt != maxRetries) {
+   firstTimestamp = System.currentTimeMillis();
+   topic = writeSequence("runStartFromTimestamp", 
initialRecordsInEachPartition, parallelism, 1);
--- End diff --

Ah, I just thought that we could have a simple loop there:

```
long secondTimestamp = System.currentTimeMillis();
while (secondTimestamp <= firstTimestamp) {
  Thread.sleep();
  secondTimestamp = System.currentTimeMillis();
}
```
what do you think?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no 

[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

2018-02-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r170966200
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws 
Exception {
partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 
22)); // partition 2 should read offset 22-49
partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 
0));  // partition 3 should read offset 0-49
 
-   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
+   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, null, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
 
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
 
+   /**
+* This test ensures that the consumer correctly uses user-supplied 
timestamp when explicitly configured to
+* start from timestamp.
+*
+* The validated Kafka data is written in 2 steps: first, an initial 
50 records is written to each partition.
+* After that, another 30 records is appended to each partition. Before 
each step, a timestamp is recorded.
+* For the validation, when the read job is configured to start from 
the first timestamp, each partition should start
+* from offset 0 and read a total of 80 records. When configured to 
start from the second timestamp,
+* each partition should start from offset 50 and read on the remaining 
30 appended records.
+*/
+   public void runStartFromTimestamp() throws Exception {
+   // 4 partitions with 50 records each
+   final int parallelism = 4;
+   final int initialRecordsInEachPartition = 50;
+   final int appendRecordsInEachPartition = 30;
+
+   long firstTimestamp = 0;
+   long secondTimestamp = 0;
+   String topic = "";
+
+   // attempt to create an appended test sequence, where the 
timestamp of writing the appended sequence
+   // is assured to be larger than the timestamp of the original 
sequence.
+   final int maxRetries = 3;
+   int attempt = 0;
+   while (attempt != maxRetries) {
+   firstTimestamp = System.currentTimeMillis();
+   topic = writeSequence("runStartFromTimestamp", 
initialRecordsInEachPartition, parallelism, 1);
--- End diff --

Ah, I just thought that we could have a simple loop there:

```
long secondTimestamp = System.currentTimeMillis();
while (secondTimestamp <= firstTimestamp) {
  Thread.sleep();
  secondTimestamp = System.currentTimeMillis();
}
```
what do you think?


---


[jira] [Commented] (FLINK-8777) improve resource release when recovery from failover

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378796#comment-16378796
 ] 

ASF GitHub Bot commented on FLINK-8777:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
 
-   Map toDiscard = new HashMap<>(16);
+   Map.Entry toDiscard = null;
 
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the 
state.
-   toDiscard.put(checkpointId, localState);
+   toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState);
} else {
TaskStateSnapshot previous =

storedTaskStateByCheckpointID.put(checkpointId, localState);
 
if (previous != null) {
-   toDiscard.put(checkpointId, previous);
+   toDiscard = new 
AbstractMap.SimpleEntry(checkpointId, previous);
}
}
}
 
-   asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+   if (toDiscard != null) {
+   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+   }
}
 
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   Iterator> 
entryIterator =
--- End diff --

addressed.


> improve resource release when recovery from failover
> 
>
> Key: FLINK-8777
> URL: https://issues.apache.org/jira/browse/FLINK-8777
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
> will be invoked, we can release all entry from 
> {{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
> == checkpointID}}, this can prevent the resource leak when job loop in 
> {{local checkpoint completed => failed => local checkpoint completed => 
> failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >