[jira] [Comment Edited] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584364#comment-15584364
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4723 at 10/18/16 3:58 AM:
--

[~rmetzger] Should this fix go into release-1.1 as well?
It will affect the behaviour of committed offsets for Kafka 0.8 users, so I'm 
not quite sure.


was (Author: tzulitai):
[~rmetzger] Should this fix go into release-1.1 as well?

> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584364#comment-15584364
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4723:


[~rmetzger] Should this fix go into release-1.1 as well?

> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-4723.

Resolution: Fixed

> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584361#comment-15584361
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4723:


Resolved for master via 
http://git-wip-us.apache.org/repos/asf/flink/commit/f46ca39

> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2585: [FLINK-4727] [kafka-connector] Set missing initial offset...

2016-10-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2585
  
Rebasing to include f46ca39 with the IT tests de-commented. Will merge when 
Travis turns green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584340#comment-15584340
 ] 

ASF GitHub Bot commented on FLINK-4727:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2585
  
Rebasing to include f46ca39 with the IT tests de-commented. Will merge when 
Travis turns green.


> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> --
>
> Key: FLINK-4727
> URL: https://issues.apache.org/jira/browse/FLINK-4727
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584332#comment-15584332
 ] 

ASF GitHub Bot commented on FLINK-4723:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2580


> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2580: [FLINK-4723] [kafka-connector] Unify committed off...

2016-10-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2580


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2518: [FLINK-3931] Implement Transport Encryption (SSL/TLS)

2016-10-17 Thread skrishnappa
Github user skrishnappa commented on the issue:

https://github.com/apache/flink/pull/2518
  
Thanks @mxm for following this up. Cheers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3931) Implement Transport Encryption (SSL/TLS)

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584329#comment-15584329
 ] 

ASF GitHub Bot commented on FLINK-3931:
---

Github user skrishnappa commented on the issue:

https://github.com/apache/flink/pull/2518
  
Thanks @mxm for following this up. Cheers


> Implement Transport Encryption (SSL/TLS)
> 
>
> Key: FLINK-3931
> URL: https://issues.apache.org/jira/browse/FLINK-3931
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Suresh Krishnappa
>  Labels: security
> Fix For: 1.2.0
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> To assure privacy and data integrity between Flink components, enable TLS for 
> all communication channels.  As described in the design doc:
> - Accept a configured certificate or generate a certificate.
> - Enable Akka SSL
> - Implement Data Transfer SSL
> - Implement Blob Server SSL
> - Implement Web UI HTTPS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584323#comment-15584323
 ] 

ASF GitHub Bot commented on FLINK-4723:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2580
  
Merging this to master now ...


> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 
> consumer
> -
>
> Key: FLINK-4723
> URL: https://issues.apache.org/jira/browse/FLINK-4723
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be 
> "the next offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the 
> committed offsets back to Kafka by the 0.9 by 1, so that the internal 
> {{KafkaConsumer}} picks up the correct start position when committed offsets 
> are present. This fix was required because the start position from committed 
> offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position 
> using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, 
> the 0.8 consumer did not require a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 
> 0.9 to the definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and 
> have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka 
> consumer to read the same topic in a non-Flink application, the first record 
> will be duplicate (because, like described above, Kafka high-level consumers 
> expect the committed offsets to be "the next record to process" and not "the 
> last processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be 
> incremented by 1, and changing how Flink internal offsets are initialized 
> with accordance to the acquired ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2580: [FLINK-4723] [kafka-connector] Unify committed offsets to...

2016-10-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2580
  
Merging this to master now ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584306#comment-15584306
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @twalthr @fhueske  , it will be great if you can review this. 


> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. eval should always return java.lang.Iterable or scala.collection.Iterable 
> with the generic type T.
> 3. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF {
> public Iterable eval(String str) {
> if (str == null) {
> return new ArrayList<>();
> } else {
> List list = new ArrayList<>();
> for (String s : str.split(",")) {
> Word word = new Word(s, s.length());
> list.add(word);
> }
> return list;
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c)", "w, l")  
>  .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>  .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c), 'w, 'l)
>  .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>  .select('a, 'b, 'word, 'length)
> {code}
> Here we introduce CROSS/OUTER APPLY keywords to join table functions , which 
> is used in SQL Server. We can discuss the API in the comment. 
> Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something 
> others, because we have introduced {{ScalarFunction}} for custom functions, 
> we need to keep consistent. Although, I prefer {{UDTF}} rather than 
> {{TableFunction}} as the former is more SQL-like and the latter maybe 
> confused with DataStream functions. 
> **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this 
> and release.**
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...

2016-10-17 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2653
  
Hi @twalthr @fhueske  , it will be great if you can review this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584302#comment-15584302
 ] 

ASF GitHub Bot commented on FLINK-4469:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2653

[FLINK-4469] [table] Add support for user defined table function in Table 
API & SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR introduces user-defined table functions for the Table and SQL API.
I will add documentation after this proposal is accepted. This is the 
general syntax so far:

In Java:

```java
public class Split extends TableFunction {
public int eval(String str) {
for (String s : str.split(" ")) {
collect(s);
}
}
}

tableEnv.registerFunction("split", new Split());
// cross apply
Table result = table.crossApply("split(c)", "s").select("c, s")
Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY 
split(c) as t(s)")

// outer apply
Table result = table.outerApply("split(c)", "s").select("c, s")
Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY 
split(c) as t(s)")
```

In Scala

```scala
object Split extends TableFunction {
  def eval(str: String): Unit = {
str.split(" ").foreach(collect)
  }
}

// cross apply
val result = table.crossApply(Split('c) as ('s)).select('c, 's)
tableEnv.registerFunction("split", Split);
val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY 
split(c) as t(s)")

// outer apply
val result = table.outerApply(Split('c) as ('s)).select('c, 's)
tableEnv.registerFunction("split", Split);
val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY 
split(c) as t(s)")
```




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink udtf-FLINK-4469

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2653


commit 60812e51156ec9fa6088154d2f6dea8c1ff9ac17
Author: Jark Wu 
Date:   2016-10-18T03:15:07Z

[FLINK-4469] [table] Add support for user defined table function in Table 
API & SQL




> Add support for user defined table function in Table API & SQL
> --
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. eval should always return java.lang.Iterable or scala.collection.Iterable 
> with the generic type T.
> 3. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> 

[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...

2016-10-17 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2653

[FLINK-4469] [table] Add support for user defined table function in Table 
API & SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR introduces user-defined table functions for the Table and SQL API.
I will add documentation after this proposal is accepted. This is the 
general syntax so far:

In Java:

```java
public class Split extends TableFunction {
public int eval(String str) {
for (String s : str.split(" ")) {
collect(s);
}
}
}

tableEnv.registerFunction("split", new Split());
// cross apply
Table result = table.crossApply("split(c)", "s").select("c, s")
Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY 
split(c) as t(s)")

// outer apply
Table result = table.outerApply("split(c)", "s").select("c, s")
Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY 
split(c) as t(s)")
```

In Scala

```scala
object Split extends TableFunction {
  def eval(str: String): Unit = {
str.split(" ").foreach(collect)
  }
}

// cross apply
val result = table.crossApply(Split('c) as ('s)).select('c, 's)
tableEnv.registerFunction("split", Split);
val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY 
split(c) as t(s)")

// outer apply
val result = table.outerApply(Split('c) as ('s)).select('c, 's)
tableEnv.registerFunction("split", Split);
val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY 
split(c) as t(s)")
```




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink udtf-FLINK-4469

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2653


commit 60812e51156ec9fa6088154d2f6dea8c1ff9ac17
Author: Jark Wu 
Date:   2016-10-18T03:15:07Z

[FLINK-4469] [table] Add support for user defined table function in Table 
API & SQL




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4315) Deprecate Hadoop dependent methods in flink-java

2016-10-17 Thread Evgeny Kincharov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584198#comment-15584198
 ] 

Evgeny Kincharov commented on FLINK-4315:
-

I have pushed new version of PR. Could you look it?
BR, Evgeny

> Deprecate Hadoop dependent methods in flink-java
> 
>
> Key: FLINK-4315
> URL: https://issues.apache.org/jira/browse/FLINK-4315
> Project: Flink
>  Issue Type: Task
>  Components: Java API
>Reporter: Stephan Ewen
>Assignee: Evgeny Kincharov
> Fix For: 2.0.0
>
>
> The API projects should be independent of Hadoop, because Hadoop is not an 
> integral part of the Flink stack, and we should have the option to offer 
> Flink without Hadoop dependencies.
> The current batch APIs have a hard dependency on Hadoop, mainly because the 
> API has utility methods like `readHadoopFile(...)`.
> I suggest to deprecate those methods and add helpers in the 
> `flink-hadoop-compatibility` project.
> FLINK-4048 will later remove the deprecated methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext

2016-10-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4849:
--
Description: 
{code}
  String trustStorePassword = sslConfig.getString(
ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
null);
...
  try {
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
{code}
If trustStorePassword is null, the load() call would throw NPE.

  was:
{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.


> trustStorePassword should be checked against null in 
> SSLUtils#createSSLClientContext
> 
>
> Key: FLINK-4849
> URL: https://issues.apache.org/jira/browse/FLINK-4849
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String trustStorePassword = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD,
> null);
> ...
>   try {
> trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
> trustStore.load(trustStoreFile, trustStorePassword.toCharArray());
> {code}
> If trustStorePassword is null, the load() call would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext

2016-10-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4849:
--
Comment: was deleted

(was: Dup of FLINK-4848)

> trustStorePassword should be checked against null in 
> SSLUtils#createSSLClientContext
> 
>
> Key: FLINK-4849
> URL: https://issues.apache.org/jira/browse/FLINK-4849
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext

2016-10-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened FLINK-4849:
---

> trustStorePassword should be checked against null in 
> SSLUtils#createSSLClientContext
> 
>
> Key: FLINK-4849
> URL: https://issues.apache.org/jira/browse/FLINK-4849
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext

2016-10-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4849:
--
Summary: trustStorePassword should be checked against null in 
SSLUtils#createSSLClientContext  (was: keystoreFilePath should be checked 
against null in SSLUtils#createSSLServerContext)

> trustStorePassword should be checked against null in 
> SSLUtils#createSSLClientContext
> 
>
> Key: FLINK-4849
> URL: https://issues.apache.org/jira/browse/FLINK-4849
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-10-17 Thread Philipp von dem Bussche (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582974#comment-15582974
 ] 

Philipp von dem Bussche commented on FLINK-2821:


Thanks [~mxm], I am not getting this Exception anymore, however I don't think 
this is working yet.
I have to admit though that I had to change my environment slightly in which I 
am testing since I am currently travelling. I don't at the moment have access 
to the Rancher environment so I am purely bringing up a Docker container on my 
Mac within a (non-native) docker-machine which basically means I have a 
virtualbox virtual machine running on my Mac which runs the Docker daemon and 
from this virtual machine I am running my Docker containers at the moment. I do 
believe though that this test environment is quite similar to my initial test 
with Rancher. I have exposed port 6123 from the docker container to the host 
(aka the virtual machine).

This happens on my non-customized 1.1.3 build (not the one you have created for 
me):
I am trying to access my Flink's jobmanager rpc address (doing a simple flink 
list from my Mac) like this:

PHILIPPs-MacBook:~ philipp$ flink list --jobmanager 192.168.99.100:6123 # 
192.168.99.100 is the docker host's IP / the IP of the virtual machine

I am getting this error message after a while:

Retrieving JobManager.
Using address /192.168.99.100:6123 to connect to JobManager.


 The program finished with the following exception:

org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
retrieve the leader gateway
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127)
at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:644)
at 
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:868)
at org.apache.flink.client.CliFrontend.list(CliFrontend.java:387)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1008)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125)
... 5 more

And in my Flink's jobmanager log file I am seeing this error message:

2016-10-17 17:58:46,088 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Starting JobManager at 
akka.tcp://flink@172.17.0.2:6123/user/jobmanager.
2016-10-17 17:58:46,108 INFO  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Trying to associate with JobManager leader 
akka.tcp://flink@172.17.0.2:6123/user/jobmanager
2016-10-17 17:58:46,132 INFO  org.apache.flink.runtime.jobmanager.JobManager
- JobManager akka.tcp://flink@172.17.0.2:6123/user/jobmanager was 
granted leadership with leader session ID None.
2016-10-17 17:58:46,140 INFO  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Resource Manager associating with leading JobManager 
Actor[akka://flink/user/jobmanager#-1164381512] - leader session null
2016-10-17 17:59:34,896 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient [Actor[akka.tcp://flink@192.168.99.100:6123/]] arriving at 
[akka.tcp://flink@192.168.99.100:6123] inbound addresses are 
[akka.tcp://flink@172.17.0.2:6123]
2016-10-17 17:59:45,052 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system 
[akka.tcp://flink@192.168.99.1:51492] has failed, address is now gated for 
[5000] ms. Reason is: [Disassociated].

I would think that the difference between this and the Rancher approach would 
be that Rancher introduces this third IP address (10.x) which gets used when 
using the Rancher DNS name between containers in a Rancher environment.

Anyways when I am using the custom version that you have sent me and I 
configure my jobmanager like this:

jobmanager.rpc.address: 192.168.99.100
jobmanager.rpc.bind-address: da54c7ceaaa9 # container's host name resolving to 
the 172.x address
jobmanager.rpc.port: 6123
jobmanager.rpc.bind-port: 6123

The jobmanager startup fails with a message like this 

[jira] [Created] (FLINK-4849) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4849:
-

 Summary: keystoreFilePath should be checked against null in 
SSLUtils#createSSLServerContext
 Key: FLINK-4849
 URL: https://issues.apache.org/jira/browse/FLINK-4849
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930
 ] 

Ted Yu commented on FLINK-4848:
---

There is similar issue with trustStoreFilePath

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4849) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-4849.
---
Resolution: Duplicate

Dup of FLINK-4848

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4849
> URL: https://issues.apache.org/jira/browse/FLINK-4849
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-10-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4848:
-

 Summary: keystoreFilePath should be checked against null in 
SSLUtils#createSSLServerContext
 Key: FLINK-4848
 URL: https://issues.apache.org/jira/browse/FLINK-4848
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  String keystoreFilePath = sslConfig.getString(
ConfigConstants.SECURITY_SSL_KEYSTORE,
null);
...
  try {
keyStoreFile = new FileInputStream(new File(keystoreFilePath));
{code}
If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582860#comment-15582860
 ] 

ASF GitHub Bot commented on FLINK-4510:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2453
  
Thanks for the contribution. I'm going to merge this with slight 
modifications. We don't need the introduced checks with the current master as 
there is a check whether periodic checkpoints are activated.


> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...

2016-10-17 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2453
  
Thanks for the contribution. I'm going to merge this with slight 
modifications. We don't need the introduced checks with the current master as 
there is a check whether periodic checkpoints are activated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4174) Enhance Window Evictor

2016-10-17 Thread vishnu viswanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vishnu viswanath reassigned FLINK-4174:
---

Assignee: vishnu viswanath

> Enhance Window Evictor
> --
>
> Key: FLINK-4174
> URL: https://issues.apache.org/jira/browse/FLINK-4174
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: vishnu viswanath
>Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design 
> document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the 
> beginning). To do this Evictor must go through the list of elements and 
> remove the elements that have to be evicted instead of the current approach 
> of : returning the count of elements to be removed from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : 
> [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582779#comment-15582779
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83666257
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 ---
@@ -18,14 +18,18 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 import java.util.Set;
 
 /**
- * Interface for a backend that manages operator state.
+ * This interface contains methods for registering operator state with a 
managed store.
  */
+@PublicEvolving
 public interface OperatorStateStore {
 
+   /** The default namespace for state in cases where no state name is 
provided */
String DEFAULT_OPERATOR_STATE_NAME = "_default_";
--- End diff --

This is an implementation detail that should not be exposed on this 
interface.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582782#comment-15582782
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Encapsulates all runnable futures draw snapshots for a single subtask 
state of an in-flight checkpointing operation.
+ */
+public class SnapshotInProgressSubtaskState {
--- End diff --

I think this could be changed to
```
/**
 * Result of {@link AbstractStreamOperator#snapshotState}.
 */
public class OperatorSnapshotResult { ... }
```

to make it more clearer what it is supposed to be. And it should probably 
be in the same module/package as `AbstractStreamOperator` but the code layout 
of the state classes seems a bit messy so not sure if it's possible.



> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582774#comment-15582774
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83664806
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -48,7 +43,7 @@
  * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 @Public
-public interface RuntimeContext {
+public interface RuntimeContext extends KeyedStateStore {
--- End diff --

I think it would be better to not have `RuntimeContext` be a 
`KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably 
provide a `KeyedStateStore` or at least use one internally to implement the 
state methods. Properly separating the two now seems prudent.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582784#comment-15582784
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream that allows to write state into several partitions.
+ * @param  type of the returned state handle.
+ */
+public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream {
--- End diff --

I think the javadoc and class name don't accurately describe what this does 
(possibly due to some refactoring). Now it should probably be called something 
like `NonClosingCheckpointOutputStream`.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582776#comment-15582776
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83670574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -38,8 +38,8 @@
private final StreamStateHandle delegateStateHandle;
 
public OperatorStateHandle(
-   StreamStateHandle delegateStateHandle,
-   Map stateNameToPartitionOffsets) {
+   Map stateNameToPartitionOffsets,
--- End diff --

This is only changing ordering but it's triggering some one-line changes in 
other files that make it hard to keep track of what changes are really changes. 
Could you maybe revert that and change it in a follow-up?


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680393
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -2132,24 +2132,36 @@ public void 
testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex
"non-partitioned state changed.");
}
 
+   @Test
--- End diff --

Very good additions! 👍 😺 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83685681
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
 
/**
-* Calls
-* {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
 */
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
 
/**
-* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
-* calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
 * if it was not called before.
 */
-   public void open() throws Exception {
+   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
if (!setupCalled) {
setup();
}
+   operator.initializeState(operatorStateHandles);
+   initializeCalled = true;
+   }
+
+   /**
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
+* was not called before.
+*/
+   public void open() throws Exception {
+   if (!initializeCalled) {
+   initializeState(null);
+   }
operator.open();
}
 
/**
 *
 */
-   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
+   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
--- End diff --

I think we can keep the old method signature by doing something like this:
```
/**
 * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
 */
public final StreamStateHandle snapshot(long checkpointId, long 
timestamp) throws Exception {
synchronized (checkpointLock) {
CheckpointStreamFactory.CheckpointStateOutputStream 
outStream = stateBackend.createStreamFactory(
new JobID(),

"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);

if (operator instanceof StreamCheckpointedOperator) {
((StreamCheckpointedOperator) 
operator).snapshotState(
outStream,
checkpointId,
timestamp);
}

RunnableFuture snapshotRunnable = 
operator.snapshotState(
checkpointId,
timestamp,
stateBackend.createStreamFactory(new 
JobID(), "test_op"));

if (snapshotRunnable != null) {
outStream.write(1);
snapshotRunnable.run();
OperatorStateHandle operatorStateHandle = 
snapshotRunnable.get();

InstantiationUtil.serializeObject(outStream, 
operatorStateHandle);
} else {
outStream.write(0);
}

snapshotToStream(checkpointId, timestamp, outStream);

return outStream.closeAndGetHandle();
}
}
```
This multiplexes the results from the different operator snapshotting 
methods into the same stream. The restore method just tweezes out the correct 
items from the stream and hands them to the correct operator methods.

This would let all tests use the same method and we can keep the 
name/signature the same if we evolve the operator/snapshot interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Encapsulates all runnable futures draw snapshots for a single subtask 
state of an in-flight checkpointing operation.
+ */
+public class SnapshotInProgressSubtaskState {
--- End diff --

I think this could be changed to
```
/**
 * Result of {@link AbstractStreamOperator#snapshotState}.
 */
public class OperatorSnapshotResult { ... }
```

to make it more clearer what it is supposed to be. And it should probably 
be in the same module/package as `AbstractStreamOperator` but the code layout 
of the state classes seems a bit messy so not sure if it's possible.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582792#comment-15582792
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680393
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
@@ -2132,24 +2132,36 @@ public void 
testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex
"non-partitioned state changed.");
}
 
+   @Test
--- End diff --

Very good additions!   


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582777#comment-15582777
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673691
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 ---
@@ -34,10 +37,14 @@
 
private static final long serialVersionUID = -2394696997971923995L;
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SubtaskState.class);
-
-   /** The state of the parallel operator */
-   private final ChainedStateHandle chainedStateHandle;
+   /**
+* The state of the parallel operator
+*/
+   private final ChainedStateHandle 
nonPartitionableOperatorState;
--- End diff --

I think these names don't match the names in the rest of the code base.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83682543
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -238,11 +294,51 @@ public void dispose() throws Exception {
}
 
@Override
-   public RunnableFuture snapshotState(
+   public SnapshotInProgressSubtaskState snapshotState(
--- End diff --

This should probably be `final`, similarly to how 
`initializeState(OperatorStateHandles)` is `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582786#comment-15582786
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83674157
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.util.CollectionUtil;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class encapsulates all state handles for a task.
+ */
+public class TaskStateHandles implements Serializable {
--- End diff --

Very good addition for simplifying the handling of all the state handles 
that are flying around.   


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582796#comment-15582796
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83685681
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
 
/**
-* Calls
-* {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
 */
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
 
/**
-* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
-* calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
 * if it was not called before.
 */
-   public void open() throws Exception {
+   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
if (!setupCalled) {
setup();
}
+   operator.initializeState(operatorStateHandles);
+   initializeCalled = true;
+   }
+
+   /**
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+* Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
+* was not called before.
+*/
+   public void open() throws Exception {
+   if (!initializeCalled) {
+   initializeState(null);
+   }
operator.open();
}
 
/**
 *
 */
-   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
+   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
--- End diff --

I think we can keep the old method signature by doing something like this:
```
/**
 * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
 */
public final StreamStateHandle snapshot(long checkpointId, long 
timestamp) throws Exception {
synchronized (checkpointLock) {
CheckpointStreamFactory.CheckpointStateOutputStream 
outStream = stateBackend.createStreamFactory(
new JobID(),

"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);

if (operator instanceof StreamCheckpointedOperator) {
((StreamCheckpointedOperator) 
operator).snapshotState(
outStream,
checkpointId,
timestamp);
}

RunnableFuture snapshotRunnable = 
operator.snapshotState(
checkpointId,
timestamp,
stateBackend.createStreamFactory(new 
JobID(), "test_op"));

if (snapshotRunnable != null) {
outStream.write(1);
snapshotRunnable.run();
OperatorStateHandle operatorStateHandle = 
snapshotRunnable.get();

InstantiationUtil.serializeObject(outStream, 
operatorStateHandle);
} else {
outStream.write(0);
}

snapshotToStream(checkpointId, timestamp, outStream);

return outStream.closeAndGetHandle();
}
}
```
This multiplexes the results from the different operator snapshotting 
methods into the same stream. The restore method just tweezes out the correct 
items from the stream and hands them to the correct operator methods.

This would let all tests use the same method and we can keep the 
name/signature the same if we evolve the 

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83673691
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 ---
@@ -34,10 +37,14 @@
 
private static final long serialVersionUID = -2394696997971923995L;
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SubtaskState.class);
-
-   /** The state of the parallel operator */
-   private final ChainedStateHandle chainedStateHandle;
+   /**
+* The state of the parallel operator
+*/
+   private final ChainedStateHandle 
nonPartitionableOperatorState;
--- End diff --

I think these names don't match the names in the rest of the code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582794#comment-15582794
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.RunnableFuture;
+
+public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext {
--- End diff --

Some Javadocs would probably be helpful.  


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582791#comment-15582791
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679164
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java 
---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface KeyGroupsList extends Iterable {
--- End diff --

This one could benefit from some Javadocs.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582793#comment-15582793
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83682543
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -238,11 +294,51 @@ public void dispose() throws Exception {
}
 
@Override
-   public RunnableFuture snapshotState(
+   public SnapshotInProgressSubtaskState snapshotState(
--- End diff --

This should probably be `final`, similarly to how 
`initializeState(OperatorStateHandles)` is `final`.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582787#comment-15582787
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83677603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader 
userClassLoader) {
}
 
/**
-* @see SnapshotProvider
+* @see Snapshotable
--- End diff --

I think an empty Javadoc does simply prevent tools from displaying the 
Javadoc of the overridden method so it's probably best to remove those. There 
are also more instances of that in this file.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582785#comment-15582785
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
 ---
@@ -25,6 +25,13 @@
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * This class allows to register instances of {@link Closeable}, which are 
all closed if this registry is closed.
+ * 
--- End diff --

The correct way of separating paragraphs in Javadoc is this:
```
Paragraph one.

Paragraph two

...
```

I know it's not proper HTML nowadays but that's how it's supposed to be ... 
 


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582788#comment-15582788
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This interface provides a context in which operators that use managed 
state (i.e. state that is managed by state
+ * backends) can perform a snapshot. As snapshots of the backends 
themselves are taken by the system, this interface
+ * mainly provides meta information about the checkpoint.
+ */
+@PublicEvolving
+public interface ManagedSnapshotContext {
--- End diff --

Same comments as for `ManagedInitializationContext` hold here.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582789#comment-15582789
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
+
+/**
+ * This interface provides a context in which operators can initialize by 
registering to managed state (i.e. state that
+ * is managed by state backends).
+ * 
+ *
+ * Operator state is available to all operators, while keyed state is only 
available for operators after keyBy.
+ * 
+ *
+ * For the purpose of initialization, the context signals if the state is 
empty (new operator) or was restored from
+ * a previous execution of this operator.
+ *
+ */
+public interface ManagedInitializationContext {
--- End diff --

I think this interface and its sub interfaces/implementations should be in 
the same module as `AbstractStreamOperator` and somewhere in the api package 
space.

Also, the naming could be changed to something like 
`StateInitializationContext` -> `FunctionInitializationContext` -> 
`OperatorInitializationContext`. Or something reflecting their purpose but 
`StateInitializationContext` should be at the bottom of the hierarchy.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582795#comment-15582795
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83683321
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class holds all state handles for one operator.
+ */
+public class OperatorStateHandles {
--- End diff --

This should be `@Internal` or at least `@PublicEvolving`. Also, the name 
clashes a bit with `OperatorStateHandle` which does something quite different.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582775#comment-15582775
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83663696
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -305,39 +306,42 @@ public void close() throws Exception {
super.close();
}
}
-   
+
// 

//  Checkpoint and restore
// 

 
-   @Override
-   public void initializeState(OperatorStateStore stateStore) throws 
Exception {
 
-   this.stateStore = stateStore;
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   ListState offsets =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+   OperatorStateStore stateStore = 
context.getManagedOperatorStateStore();
+   offsetsStateForCheckpoint = 
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
-   restoreToOffset = new HashMap<>();
+   if (context.isRestored()) {
+   restoreToOffset = new HashMap<>();
+   for (Serializable serializable : 
offsetsStateForCheckpoint.get()) {
+   @SuppressWarnings("unchecked")
+   Tuple2 kafkaOffset = 
(Tuple2) serializable;
+   restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
+   }
 
-   for (Serializable serializable : offsets.get()) {
-   @SuppressWarnings("unchecked")
-   Tuple2 kafkaOffset = 
(Tuple2) serializable;
-   restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+   LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Using the following offsets: {}", 
restoreToOffset);
+   }
+   } else {
+   LOG.info("No restore state for FlinkKafkaConsumer.");
}
-
-   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoreToOffset);
}
 
@Override
-   public void prepareSnapshot(long checkpointId, long timestamp) throws 
Exception {
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
if (!running) {
LOG.debug("storeOperatorState() called on closed 
source");
} else {
 
-   ListState listState =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
-   listState.clear();
+   offsetsStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
--- End diff --

This is a workaround for the fact that we initialise the fetcher in `run()` 
and not in `open()`. Might be worthwhile to change that in a follow-up, if at 
all possible.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582780#comment-15582780
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661983
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -55,20 +56,20 @@
 /**
  * Base class of all Flink Kafka Consumer data sources.
  * This implements the common behavior across all Kafka versions.
- * 
+ *
--- End diff --

This file contains a lot of whitespace changes. It would be good to remove 
them before we merge this.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582778#comment-15582778
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661373
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long 
timestamp) throws Exception
}
}
 
+   @Override
--- End diff --

The methods don't need to be reordered here. Also, the state store is not 
used anywhere, as far as I can see.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582781#comment-15582781
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83669355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
 
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
-   for (Map.Entry 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
-   TaskState taskState = 
taskGroupStateEntry.getValue();
-   ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
-
-   if (executionJobVertex != null) {
-   // check that the number of key groups 
have not changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new 
IllegalStateException("The maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
-   }
-
-
-   int oldParallelism = 
taskState.getParallelism();
-   int newParallelism = 
executionJobVertex.getParallelism();
-   boolean parallelismChanged = 
oldParallelism != newParallelism;
-   boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
-
-   if (hasNonPartitionedState && 
parallelismChanged) {
-   throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
-   "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-   "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-   " has parallelism " + 
newParallelism + " whereas the corresponding" +
-   "state object has a 
parallelism of " + oldParallelism);
-   }
-
-   List keyGroupPartitions 
= createKeyGroupPartitions(
-   
executionJobVertex.getMaxParallelism(),
-   newParallelism);
-   
-   // operator chain index -> list of the 
stored partitionables states from all parallel instances
-   @SuppressWarnings("unchecked")
-   List[] 
chainParallelStates =
-   new 
List[taskState.getChainLength()];
-
-   for (int i = 0; i < oldParallelism; 
++i) {
-
-   
ChainedStateHandle partitionableState =
-   
taskState.getPartitionableState(i);
-
-   if (partitionableState != null) 
{
-   for (int j = 0; j < 
partitionableState.getLength(); ++j) {
-   
OperatorStateHandle opParalleState = partitionableState.get(j);
-   if 
(opParalleState != null) {
-   
List opParallelStates =
-   
chainParallelStates[j];
-   if 
(opParallelStates == null) {
-   

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83683321
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class holds all state handles for one operator.
+ */
+public class OperatorStateHandles {
--- End diff --

This should be `@Internal` or at least `@PublicEvolving`. Also, the name 
clashes a bit with `OperatorStateHandle` which does something quite different.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582783#comment-15582783
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83662569
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -80,38 +81,38 @@
// 

 
private final List topics;
-   
+
/** The schema to convert between Kafka's byte messages, and Flink's 
objects */
protected final KeyedDeserializationSchema deserializer;
 
/** The set of topic partitions that the source will read */
protected List subscribedPartitions;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
 * to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
periodicWatermarkAssigner;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
-* to exploit per-partition timestamp characteristics. 
+* to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient OperatorStateStore stateStore;
+   private transient ListState offsetsStateForCheckpoint;
--- End diff --

This can can have a more concrete type. You changed 
`OperatorStateStore.getSerializableListState` to this:
```
 ListState getSerializableListState(String 
stateName) throws Exception;
```


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582790#comment-15582790
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680077
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class NonClosingStreamDecorator extends InputStream {
--- End diff --

It's quite clear what it does but Javadocs would still be nice.


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83680077
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class NonClosingStreamDecorator extends InputStream {
--- End diff --

It's quite clear what it does but Javadocs would still be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream that allows to write state into several partitions.
+ * @param  type of the returned state handle.
+ */
+public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream {
--- End diff --

I think the javadoc and class name don't accurately describe what this does 
(possibly due to some refactoring). Now it should probably be called something 
like `NonClosingCheckpointOutputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83662569
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -80,38 +81,38 @@
// 

 
private final List topics;
-   
+
/** The schema to convert between Kafka's byte messages, and Flink's 
objects */
protected final KeyedDeserializationSchema deserializer;
 
/** The set of topic partitions that the source will read */
protected List subscribedPartitions;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
 * to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
periodicWatermarkAssigner;
-   
+
/** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
-* to exploit per-partition timestamp characteristics. 
+* to exploit per-partition timestamp characteristics.
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient OperatorStateStore stateStore;
+   private transient ListState offsetsStateForCheckpoint;
--- End diff --

This can can have a more concrete type. You changed 
`OperatorStateStore.getSerializableListState` to this:
```
 ListState getSerializableListState(String 
stateName) throws Exception;
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83677603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader 
userClassLoader) {
}
 
/**
-* @see SnapshotProvider
+* @see Snapshotable
--- End diff --

I think an empty Javadoc does simply prevent tools from displaying the 
Javadoc of the overridden method so it's probably best to remove those. There 
are also more instances of that in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679164
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java 
---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+public interface KeyGroupsList extends Iterable {
--- End diff --

This one could benefit from some Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661373
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long 
timestamp) throws Exception
}
}
 
+   @Override
--- End diff --

The methods don't need to be reordered here. Also, the state store is not 
used anywhere, as far as I can see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83661983
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -55,20 +56,20 @@
 /**
  * Base class of all Flink Kafka Consumer data sources.
  * This implements the common behavior across all Kafka versions.
- * 
+ *
--- End diff --

This file contains a lot of whitespace changes. It would be good to remove 
them before we merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83679673
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.RunnableFuture;
+
+public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext {
--- End diff --

Some Javadocs would probably be helpful. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83666257
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 ---
@@ -18,14 +18,18 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 import java.io.Serializable;
 import java.util.Set;
 
 /**
- * Interface for a backend that manages operator state.
+ * This interface contains methods for registering operator state with a 
managed store.
  */
+@PublicEvolving
 public interface OperatorStateStore {
 
+   /** The default namespace for state in cases where no state name is 
provided */
String DEFAULT_OPERATOR_STATE_NAME = "_default_";
--- End diff --

This is an implementation detail that should not be exposed on this 
interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This interface provides a context in which operators that use managed 
state (i.e. state that is managed by state
+ * backends) can perform a snapshot. As snapshots of the backends 
themselves are taken by the system, this interface
+ * mainly provides meta information about the checkpoint.
+ */
+@PublicEvolving
+public interface ManagedSnapshotContext {
--- End diff --

Same comments as for `ManagedInitializationContext` hold here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83678698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
+
+/**
+ * This interface provides a context in which operators can initialize by 
registering to managed state (i.e. state that
+ * is managed by state backends).
+ * 
+ *
+ * Operator state is available to all operators, while keyed state is only 
available for operators after keyBy.
+ * 
+ *
+ * For the purpose of initialization, the context signals if the state is 
empty (new operator) or was restored from
+ * a previous execution of this operator.
+ *
+ */
+public interface ManagedInitializationContext {
--- End diff --

I think this interface and its sub interfaces/implementations should be in 
the same module as `AbstractStreamOperator` and somewhere in the api package 
space.

Also, the naming could be changed to something like 
`StateInitializationContext` -> `FunctionInitializationContext` -> 
`OperatorInitializationContext`. Or something reflecting their purpose but 
`StateInitializationContext` should be at the bottom of the hierarchy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83676729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
 ---
@@ -25,6 +25,13 @@
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * This class allows to register instances of {@link Closeable}, which are 
all closed if this registry is closed.
+ * 
--- End diff --

The correct way of separating paragraphs in Javadoc is this:
```
Paragraph one.

Paragraph two

...
```

I know it's not proper HTML nowadays but that's how it's supposed to be ... 
😜 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83669355
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
 
LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
-   for (Map.Entry 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
-   TaskState taskState = 
taskGroupStateEntry.getValue();
-   ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
-
-   if (executionJobVertex != null) {
-   // check that the number of key groups 
have not changed
-   if (taskState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
-   throw new 
IllegalStateException("The maximum parallelism (" +
-   
taskState.getMaxParallelism() + ") with which the latest " +
-   "checkpoint of the 
execution job vertex " + executionJobVertex +
-   " has been taken and 
the current maximum parallelism (" +
-   
executionJobVertex.getMaxParallelism() + ") changed. This " +
-   "is currently not 
supported.");
-   }
-
-
-   int oldParallelism = 
taskState.getParallelism();
-   int newParallelism = 
executionJobVertex.getParallelism();
-   boolean parallelismChanged = 
oldParallelism != newParallelism;
-   boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
-
-   if (hasNonPartitionedState && 
parallelismChanged) {
-   throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
-   "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
-   "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-   " has parallelism " + 
newParallelism + " whereas the corresponding" +
-   "state object has a 
parallelism of " + oldParallelism);
-   }
-
-   List keyGroupPartitions 
= createKeyGroupPartitions(
-   
executionJobVertex.getMaxParallelism(),
-   newParallelism);
-   
-   // operator chain index -> list of the 
stored partitionables states from all parallel instances
-   @SuppressWarnings("unchecked")
-   List[] 
chainParallelStates =
-   new 
List[taskState.getChainLength()];
-
-   for (int i = 0; i < oldParallelism; 
++i) {
-
-   
ChainedStateHandle partitionableState =
-   
taskState.getPartitionableState(i);
-
-   if (partitionableState != null) 
{
-   for (int j = 0; j < 
partitionableState.getLength(); ++j) {
-   
OperatorStateHandle opParalleState = partitionableState.get(j);
-   if 
(opParalleState != null) {
-   
List opParallelStates =
-   
chainParallelStates[j];
-   if 
(opParallelStates == null) {
-   
opParallelStates = new ArrayList<>();
-   
chainParallelStates[j] = opParallelStates;
-   }
-   

[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83664806
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -48,7 +43,7 @@
  * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 @Public
-public interface RuntimeContext {
+public interface RuntimeContext extends KeyedStateStore {
--- End diff --

I think it would be better to not have `RuntimeContext` be a 
`KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably 
provide a `KeyedStateStore` or at least use one internally to implement the 
state methods. Properly separating the two now seems prudent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83674157
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.util.CollectionUtil;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class encapsulates all state handles for a task.
+ */
+public class TaskStateHandles implements Serializable {
--- End diff --

Very good addition for simplifying the handling of all the state handles 
that are flying around. 👍 😄 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83670574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -38,8 +38,8 @@
private final StreamStateHandle delegateStateHandle;
 
public OperatorStateHandle(
-   StreamStateHandle delegateStateHandle,
-   Map stateNameToPartitionOffsets) {
+   Map stateNameToPartitionOffsets,
--- End diff --

This is only changing ordering but it's triggering some one-line changes in 
other files that make it hard to keep track of what changes are really changes. 
Could you maybe revert that and change it in a follow-up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2648#discussion_r83663696
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -305,39 +306,42 @@ public void close() throws Exception {
super.close();
}
}
-   
+
// 

//  Checkpoint and restore
// 

 
-   @Override
-   public void initializeState(OperatorStateStore stateStore) throws 
Exception {
 
-   this.stateStore = stateStore;
+   @Override
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-   ListState offsets =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+   OperatorStateStore stateStore = 
context.getManagedOperatorStateStore();
+   offsetsStateForCheckpoint = 
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
-   restoreToOffset = new HashMap<>();
+   if (context.isRestored()) {
+   restoreToOffset = new HashMap<>();
+   for (Serializable serializable : 
offsetsStateForCheckpoint.get()) {
+   @SuppressWarnings("unchecked")
+   Tuple2 kafkaOffset = 
(Tuple2) serializable;
+   restoreToOffset.put(kafkaOffset.f0, 
kafkaOffset.f1);
+   }
 
-   for (Serializable serializable : offsets.get()) {
-   @SuppressWarnings("unchecked")
-   Tuple2 kafkaOffset = 
(Tuple2) serializable;
-   restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
+   LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Using the following offsets: {}", 
restoreToOffset);
+   }
+   } else {
+   LOG.info("No restore state for FlinkKafkaConsumer.");
}
-
-   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoreToOffset);
}
 
@Override
-   public void prepareSnapshot(long checkpointId, long timestamp) throws 
Exception {
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
if (!running) {
LOG.debug("storeOperatorState() called on closed 
source");
} else {
 
-   ListState listState =
-   
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
-   listState.clear();
+   offsetsStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
--- End diff --

This is a workaround for the fact that we initialise the fetcher in `run()` 
and not in `open()`. Might be worthwhile to change that in a follow-up, if at 
all possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582735#comment-15582735
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2652

[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

- Splits the cancellation up into two threads:
* The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on 
the executing Thread. It then exists.
   * The `TaskCancellationWatchDog` kicks in after the task cancellation 
timeout (current default: 30 secs) and periodically calls `interrupt` on the 
executing Thread. If the Thread does not terminate within the task cancellation 
timeout (new config value, default 3 mins), the task manager is notified about 
a fatal error, leading to termination of the JVM.
- The new configuration is exposed via 
`ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
  (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation 
interval).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4715-suicide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2652






> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-17 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2652

[FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck

- Splits the cancellation up into two threads:
* The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on 
the executing Thread. It then exists.
   * The `TaskCancellationWatchDog` kicks in after the task cancellation 
timeout (current default: 30 secs) and periodically calls `interrupt` on the 
executing Thread. If the Thread does not terminate within the task cancellation 
timeout (new config value, default 3 mins), the task manager is notified about 
a fatal error, leading to termination of the JVM.
- The new configuration is exposed via 
`ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS`
  (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation 
interval).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4715-suicide

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2652






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582606#comment-15582606
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2211
  
Ah, thanks for pointing out the dependency on #2094. I wasn't aware of 
that. 
Will try to push that PR further than :-).


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

2016-10-17 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2211
  
Ah, thanks for pointing out the dependency on #2094. I wasn't aware of 
that. 
Will try to push that PR further than :-).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-10-17 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582605#comment-15582605
 ] 

Maximilian Michels commented on FLINK-2821:
---

Thanks for testing [~philipp.bussche]. There was an issue with the dependency 
management of my custom build. I've resolved the issue and would like to invite 
you to try again: http://people.apache.org/~mxm/flink-1.1.3-custom-akka.zip

With the new version, you should be able to run your containers. Next, we 
should resolve why you can't use the external address of the Rancher container 
and bind to the internal container address. What kind of error do you get?

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582596#comment-15582596
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user Xazax-hun commented on the issue:

https://github.com/apache/flink/pull/2211
  
Hi!
This patch depends on the following pull request: 
https://github.com/apache/flink/pull/2094
Once it is landed I will remove the [WIP] tag. I did not remove it yet 
because I did not want the reviewer to review changes that was not done by me. 


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

2016-10-17 Thread Xazax-hun
Github user Xazax-hun commented on the issue:

https://github.com/apache/flink/pull/2211
  
Hi!
This patch depends on the following pull request: 
https://github.com/apache/flink/pull/2094
Once it is landed I will remove the [WIP] tag. I did not remove it yet 
because I did not want the reviewer to review changes that was not done by me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2651: [FLINK-4847] Let RpcEndpoint.start/shutDown throw ...

2016-10-17 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2651

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rpcExceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2651


commit ddf35c4ddb04629cddebb2401488effe93416b70
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582405#comment-15582405
 ] 

ASF GitHub Bot commented on FLINK-4847:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2651

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rpcExceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2651


commit ddf35c4ddb04629cddebb2401488effe93416b70
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.




> Let RpcEndpoint.start/shutDown throw exceptions
> ---
>
> Key: FLINK-4847
> URL: https://issues.apache.org/jira/browse/FLINK-4847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be 
> allowed to throw exceptions if things go wrong. Otherwise, exceptions will be 
> given to a callback which handles them later, even though we know that we can 
> fail the components right away (as it is the case for the {{TaskExectuor}}, 
> for example).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions

2016-10-17 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4847:


 Summary: Let RpcEndpoint.start/shutDown throw exceptions
 Key: FLINK-4847
 URL: https://issues.apache.org/jira/browse/FLINK-4847
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Till Rohrmann


The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be 
allowed to throw exceptions if things go wrong. Otherwise, exceptions will be 
given to a callback which handles them later, even though we know that we can 
fail the components right away (as it is the case for the {{TaskExectuor}}, for 
example).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...

2016-10-17 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2498
  
Yes, I merge this later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582373#comment-15582373
 ] 

ASF GitHub Bot commented on FLINK-4619:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2498
  
Yes, I merge this later today.


> JobManager does not answer to client when restore from savepoint fails
> --
>
> Key: FLINK-4619
> URL: https://issues.apache.org/jira/browse/FLINK-4619
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Maciej Prochniak
> Fix For: 1.2.0, 1.1.3
>
>
> When savepoint used is incompatible with currently deployed process, the job 
> manager never returns (jobInfo.notifyClients is not invoked in one of 
> try-catch blocks)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions

2016-10-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4847:


Assignee: Till Rohrmann

> Let RpcEndpoint.start/shutDown throw exceptions
> ---
>
> Key: FLINK-4847
> URL: https://issues.apache.org/jira/browse/FLINK-4847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be 
> allowed to throw exceptions if things go wrong. Otherwise, exceptions will be 
> given to a callback which handles them later, even though we know that we can 
> fail the components right away (as it is the case for the {{TaskExectuor}}, 
> for example).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4839) JobManager handle TaskManager's slot offering

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582341#comment-15582341
 ] 

ASF GitHub Bot commented on FLINK-4839:
---

Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2647


> JobManager handle TaskManager's slot offering
> -
>
> Key: FLINK-4839
> URL: https://issues.apache.org/jira/browse/FLINK-4839
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> JobManager receives the TaskManager's slot offers, and decide which slots to 
> accept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2647: [FLINK-4839] [cluster management] JobManager handle TaskM...

2016-10-17 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2647
  
Thanks for you reviewing @tillrohrmann , closing it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4839) JobManager handle TaskManager's slot offering

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582342#comment-15582342
 ] 

ASF GitHub Bot commented on FLINK-4839:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2647
  
Thanks for you reviewing @tillrohrmann , closing it now.


> JobManager handle TaskManager's slot offering
> -
>
> Key: FLINK-4839
> URL: https://issues.apache.org/jira/browse/FLINK-4839
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> JobManager receives the TaskManager's slot offers, and decide which slots to 
> accept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2647: [FLINK-4839] [cluster management] JobManager handl...

2016-10-17 Thread KurtYoung
Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2647


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4840) Introduce an OperatorSystemMetricGroup

2016-10-17 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582332#comment-15582332
 ] 

Stephan Ewen commented on FLINK-4840:
-

To me, throughput and latency could be in the I/O scope, because throughput is 
the number of records in/out per time.

> Introduce an OperatorSystemMetricGroup
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> There will introduces the OperatorSystemMetricGroup, which encapsulates the 
> insantiation of TPS meter, lantency/proc_time_cost histograms. 
> Operator related System metrics are not instantiated directly by the specific 
> Operator, but instead within the OperatorSystemMetricGroup contained in the 
> respective OperatorMetricGroup. They are then later accessed by relevant 
> components(maybe different places), instead of instantiated them identically 
> with static name constants. Other system scope metrics (maybe 
> delay/queue_in/queue_out) can add into the OperatorSystemMetricGroup some 
> later.
> TPS: collect records per second(StreamSource), processed elements per 
> second(other operator)
> lantency/proc_time_cost : collect an record time cost(StreamSource), 
> processed an element time cost (other operator)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2646: [FLINK-4843] Test for FsCheckpointStateOutputStream::getP...

2016-10-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2646
  
Thanks for the update, looks good!

+1 to merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4843) Introduce Test for FsCheckpointStateOutputStream::getPos

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582323#comment-15582323
 ] 

ASF GitHub Bot commented on FLINK-4843:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2646
  
Thanks for the update, looks good!

+1 to merge this


> Introduce Test for FsCheckpointStateOutputStream::getPos
> 
>
> Key: FLINK-4843
> URL: https://issues.apache.org/jira/browse/FLINK-4843
> Project: Flink
>  Issue Type: Test
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Introduce a test for FsCheckpointStateOutputStream::getPos, which is 
> currently not included in the tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4840) Introduce an OperatorSystemMetricGroup

2016-10-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582316#comment-15582316
 ] 

Chesnay Schepler commented on FLINK-4840:
-

recordsPerSecond will be added into the OperatorIOMetricGroup once the 
MetricViews are merged (https://issues.apache.org/jira/browse/FLINK-3950).

The processing time metric seems a bit redundant, can't you infer that from the 
numbers of records processed per second?



> Introduce an OperatorSystemMetricGroup
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> There will introduces the OperatorSystemMetricGroup, which encapsulates the 
> insantiation of TPS meter, lantency/proc_time_cost histograms. 
> Operator related System metrics are not instantiated directly by the specific 
> Operator, but instead within the OperatorSystemMetricGroup contained in the 
> respective OperatorMetricGroup. They are then later accessed by relevant 
> components(maybe different places), instead of instantiated them identically 
> with static name constants. Other system scope metrics (maybe 
> delay/queue_in/queue_out) can add into the OperatorSystemMetricGroup some 
> later.
> TPS: collect records per second(StreamSource), processed elements per 
> second(other operator)
> lantency/proc_time_cost : collect an record time cost(StreamSource), 
> processed an element time cost (other operator)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4839) JobManager handle TaskManager's slot offering

2016-10-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4839.
--
Resolution: Fixed

Fixed via 4f891a6c26847ac66c477853101de31eb75993f7

> JobManager handle TaskManager's slot offering
> -
>
> Key: FLINK-4839
> URL: https://issues.apache.org/jira/browse/FLINK-4839
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> JobManager receives the TaskManager's slot offers, and decide which slots to 
> accept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator

2016-10-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582301#comment-15582301
 ] 

ASF GitHub Bot commented on FLINK-4510:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2453
  
Looks good to me now.


> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...

2016-10-17 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2453
  
Looks good to me now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4846) FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM

2016-10-17 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4846:
--

 Summary: FlinkML - Pass "env" has an implicit parameter in 
MLUtils.readLibSVM
 Key: FLINK-4846
 URL: https://issues.apache.org/jira/browse/FLINK-4846
 Project: Flink
  Issue Type: Improvement
Reporter: Thomas FOURNIER
Priority: Minor


With Flink ML you can import file via MLUtils.readLibSVM (import 
org.apache.flink.ml.MLUtils)

For example:

val env = ExecutionEnvironment.getExecutionEnvironment
val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, 
"src/main/resources/svmguide1")

I'd like to pass "env" as an implicit parameter and use the method as such:

val astroTrain: DataSet[LabeledVector] = 
MLUtils.readLibSVM("src/main/resources/svmguide1")

Is it ok (not a scala specialist yet :) ) ?




 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >