[jira] [Comment Edited] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584364#comment-15584364 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4723 at 10/18/16 3:58 AM: -- [~rmetzger] Should this fix go into release-1.1 as well? It will affect the behaviour of committed offsets for Kafka 0.8 users, so I'm not quite sure. was (Author: tzulitai): [~rmetzger] Should this fix go into release-1.1 as well? > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584364#comment-15584364 ] Tzu-Li (Gordon) Tai commented on FLINK-4723: [~rmetzger] Should this fix go into release-1.1 as well? > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-4723. Resolution: Fixed > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584361#comment-15584361 ] Tzu-Li (Gordon) Tai commented on FLINK-4723: Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/f46ca39 > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2585: [FLINK-4727] [kafka-connector] Set missing initial offset...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 Rebasing to include f46ca39 with the IT tests de-commented. Will merge when Travis turns green. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read
[ https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584340#comment-15584340 ] ASF GitHub Bot commented on FLINK-4727: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 Rebasing to include f46ca39 with the IT tests de-commented. Will merge when Travis turns green. > Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no > data is read > -- > > Key: FLINK-4727 > URL: https://issues.apache.org/jira/browse/FLINK-4727 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > This is basically the 0.9 version counterpart for FLINK-3440. > When the 0.9 consumer fetches initial offsets from Kafka on startup, but does > not have any data to read, it should also checkpoint & commit these initial > offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584332#comment-15584332 ] ASF GitHub Bot commented on FLINK-4723: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2580 > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2580: [FLINK-4723] [kafka-connector] Unify committed off...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2580 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2518: [FLINK-3931] Implement Transport Encryption (SSL/TLS)
Github user skrishnappa commented on the issue: https://github.com/apache/flink/pull/2518 Thanks @mxm for following this up. Cheers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3931) Implement Transport Encryption (SSL/TLS)
[ https://issues.apache.org/jira/browse/FLINK-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584329#comment-15584329 ] ASF GitHub Bot commented on FLINK-3931: --- Github user skrishnappa commented on the issue: https://github.com/apache/flink/pull/2518 Thanks @mxm for following this up. Cheers > Implement Transport Encryption (SSL/TLS) > > > Key: FLINK-3931 > URL: https://issues.apache.org/jira/browse/FLINK-3931 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Suresh Krishnappa > Labels: security > Fix For: 1.2.0 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > To assure privacy and data integrity between Flink components, enable TLS for > all communication channels. As described in the design doc: > - Accept a configured certificate or generate a certificate. > - Enable Akka SSL > - Implement Data Transfer SSL > - Implement Blob Server SSL > - Implement Web UI HTTPS -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
[ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584323#comment-15584323 ] ASF GitHub Bot commented on FLINK-4723: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2580 Merging this to master now ... > Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 > consumer > - > > Key: FLINK-4723 > URL: https://issues.apache.org/jira/browse/FLINK-4723 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.3 > > > The proper "behaviour" of the offsets committed back to Kafka / ZK should be > "the next offset that consumers should read (in Kafka terms, the 'position')". > This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the > committed offsets back to Kafka by the 0.9 by 1, so that the internal > {{KafkaConsumer}} picks up the correct start position when committed offsets > are present. This fix was required because the start position from committed > offsets was implicitly determined with Kafka 0.9 APIs. > However, since the 0.8 consumer handles offset committing and start position > using Flink's own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, > the 0.8 consumer did not require a fix. > I propose to still unify the behaviour of committed offsets across 0.8 and > 0.9 to the definition above. > Otherwise, if users in any case first uses the 0.8 consumer to read data and > have Flink-committed offsets in ZK, and then uses a high-level 0.8 Kafka > consumer to read the same topic in a non-Flink application, the first record > will be duplicate (because, like described above, Kafka high-level consumers > expect the committed offsets to be "the next record to process" and not "the > last processed record"). > This requires incrementing the committed ZK offsets in 0.8 to also be > incremented by 1, and changing how Flink internal offsets are initialized > with accordance to the acquired ZK offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2580: [FLINK-4723] [kafka-connector] Unify committed offsets to...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2580 Merging this to master now ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584306#comment-15584306 ] ASF GitHub Bot commented on FLINK-4469: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2653 Hi @twalthr @fhueske , it will be great if you can review this. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. eval should always return java.lang.Iterable or scala.collection.Iterable > with the generic type T. > 3. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str == null) { > return new ArrayList<>(); > } else { > List list = new ArrayList<>(); > for (String s : str.split(",")) { > Word word = new Word(s, s.length()); > list.add(word); > } > return list; > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c)", "w, l") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c), 'w, 'l) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > Here we introduce CROSS/OUTER APPLY keywords to join table functions , which > is used in SQL Server. We can discuss the API in the comment. > Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something > others, because we have introduced {{ScalarFunction}} for custom functions, > we need to keep consistent. Although, I prefer {{UDTF}} rather than > {{TableFunction}} as the former is more SQL-like and the latter maybe > confused with DataStream functions. > **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this > and release.** > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2653: [FLINK-4469] [table] Add support for user defined table f...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2653 Hi @twalthr @fhueske , it will be great if you can review this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584302#comment-15584302 ] ASF GitHub Bot commented on FLINK-4469: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2653 [FLINK-4469] [table] Add support for user defined table function in Table API & SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR introduces user-defined table functions for the Table and SQL API. I will add documentation after this proposal is accepted. This is the general syntax so far: In Java: ```java public class Split extends TableFunction { public int eval(String str) { for (String s : str.split(" ")) { collect(s); } } } tableEnv.registerFunction("split", new Split()); // cross apply Table result = table.crossApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply Table result = table.outerApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` In Scala ```scala object Split extends TableFunction { def eval(str: String): Unit = { str.split(" ").foreach(collect) } } // cross apply val result = table.crossApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply val result = table.outerApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink udtf-FLINK-4469 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2653 commit 60812e51156ec9fa6088154d2f6dea8c1ff9ac17 Author: Jark WuDate: 2016-10-18T03:15:07Z [FLINK-4469] [table] Add support for user defined table function in Table API & SQL > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. eval should always return java.lang.Iterable or scala.collection.Iterable > with the generic type T. > 3. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} >
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2653 [FLINK-4469] [table] Add support for user defined table function in Table API & SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR introduces user-defined table functions for the Table and SQL API. I will add documentation after this proposal is accepted. This is the general syntax so far: In Java: ```java public class Split extends TableFunction { public int eval(String str) { for (String s : str.split(" ")) { collect(s); } } } tableEnv.registerFunction("split", new Split()); // cross apply Table result = table.crossApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply Table result = table.outerApply("split(c)", "s").select("c, s") Table result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` In Scala ```scala object Split extends TableFunction { def eval(str: String): Unit = { str.split(" ").foreach(collect) } } // cross apply val result = table.crossApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable CROSS APPLY split(c) as t(s)") // outer apply val result = table.outerApply(Split('c) as ('s)).select('c, 's) tableEnv.registerFunction("split", Split); val result = tableEnv.sql("SELECT MyTable.c, t.s FROM MyTable OUTER APPLY split(c) as t(s)") ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink udtf-FLINK-4469 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2653 commit 60812e51156ec9fa6088154d2f6dea8c1ff9ac17 Author: Jark WuDate: 2016-10-18T03:15:07Z [FLINK-4469] [table] Add support for user defined table function in Table API & SQL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4315) Deprecate Hadoop dependent methods in flink-java
[ https://issues.apache.org/jira/browse/FLINK-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15584198#comment-15584198 ] Evgeny Kincharov commented on FLINK-4315: - I have pushed new version of PR. Could you look it? BR, Evgeny > Deprecate Hadoop dependent methods in flink-java > > > Key: FLINK-4315 > URL: https://issues.apache.org/jira/browse/FLINK-4315 > Project: Flink > Issue Type: Task > Components: Java API >Reporter: Stephan Ewen >Assignee: Evgeny Kincharov > Fix For: 2.0.0 > > > The API projects should be independent of Hadoop, because Hadoop is not an > integral part of the Flink stack, and we should have the option to offer > Flink without Hadoop dependencies. > The current batch APIs have a hard dependency on Hadoop, mainly because the > API has utility methods like `readHadoopFile(...)`. > I suggest to deprecate those methods and add helpers in the > `flink-hadoop-compatibility` project. > FLINK-4048 will later remove the deprecated methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4849: -- Description: {code} String trustStorePassword = sslConfig.getString( ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, null); ... try { trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); {code} If trustStorePassword is null, the load() call would throw NPE. was: {code} String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); ... try { keyStoreFile = new FileInputStream(new File(keystoreFilePath)); {code} If keystoreFilePath is null, the File ctor would throw NPE. > trustStorePassword should be checked against null in > SSLUtils#createSSLClientContext > > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String trustStorePassword = sslConfig.getString( > ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, > null); > ... > try { > trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); > trustStore.load(trustStoreFile, trustStorePassword.toCharArray()); > {code} > If trustStorePassword is null, the load() call would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4849: -- Comment: was deleted (was: Dup of FLINK-4848) > trustStorePassword should be checked against null in > SSLUtils#createSSLClientContext > > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu reopened FLINK-4849: --- > trustStorePassword should be checked against null in > SSLUtils#createSSLClientContext > > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4849) trustStorePassword should be checked against null in SSLUtils#createSSLClientContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4849: -- Summary: trustStorePassword should be checked against null in SSLUtils#createSSLClientContext (was: keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext) > trustStorePassword should be checked against null in > SSLUtils#createSSLClientContext > > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
[ https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582974#comment-15582974 ] Philipp von dem Bussche commented on FLINK-2821: Thanks [~mxm], I am not getting this Exception anymore, however I don't think this is working yet. I have to admit though that I had to change my environment slightly in which I am testing since I am currently travelling. I don't at the moment have access to the Rancher environment so I am purely bringing up a Docker container on my Mac within a (non-native) docker-machine which basically means I have a virtualbox virtual machine running on my Mac which runs the Docker daemon and from this virtual machine I am running my Docker containers at the moment. I do believe though that this test environment is quite similar to my initial test with Rancher. I have exposed port 6123 from the docker container to the host (aka the virtual machine). This happens on my non-customized 1.1.3 build (not the one you have created for me): I am trying to access my Flink's jobmanager rpc address (doing a simple flink list from my Mac) like this: PHILIPPs-MacBook:~ philipp$ flink list --jobmanager 192.168.99.100:6123 # 192.168.99.100 is the docker host's IP / the IP of the virtual machine I am getting this error message after a while: Retrieving JobManager. Using address /192.168.99.100:6123 to connect to JobManager. The program finished with the following exception: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127) at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:644) at org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:868) at org.apache.flink.client.CliFrontend.list(CliFrontend.java:387) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1008) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125) ... 5 more And in my Flink's jobmanager log file I am seeing this error message: 2016-10-17 17:58:46,088 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager at akka.tcp://flink@172.17.0.2:6123/user/jobmanager. 2016-10-17 17:58:46,108 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka.tcp://flink@172.17.0.2:6123/user/jobmanager 2016-10-17 17:58:46,132 INFO org.apache.flink.runtime.jobmanager.JobManager - JobManager akka.tcp://flink@172.17.0.2:6123/user/jobmanager was granted leadership with leader session ID None. 2016-10-17 17:58:46,140 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#-1164381512] - leader session null 2016-10-17 17:59:34,896 ERROR akka.remote.EndpointWriter - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@192.168.99.100:6123/]] arriving at [akka.tcp://flink@192.168.99.100:6123] inbound addresses are [akka.tcp://flink@172.17.0.2:6123] 2016-10-17 17:59:45,052 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@192.168.99.1:51492] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. I would think that the difference between this and the Rancher approach would be that Rancher introduces this third IP address (10.x) which gets used when using the Rancher DNS name between containers in a Rancher environment. Anyways when I am using the custom version that you have sent me and I configure my jobmanager like this: jobmanager.rpc.address: 192.168.99.100 jobmanager.rpc.bind-address: da54c7ceaaa9 # container's host name resolving to the 172.x address jobmanager.rpc.port: 6123 jobmanager.rpc.bind-port: 6123 The jobmanager startup fails with a message like this
[jira] [Created] (FLINK-4849) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
Ted Yu created FLINK-4849: - Summary: keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext Key: FLINK-4849 URL: https://issues.apache.org/jira/browse/FLINK-4849 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); ... try { keyStoreFile = new FileInputStream(new File(keystoreFilePath)); {code} If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930 ] Ted Yu commented on FLINK-4848: --- There is similar issue with trustStoreFilePath > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4848 > URL: https://issues.apache.org/jira/browse/FLINK-4848 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4849) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-4849. --- Resolution: Duplicate Dup of FLINK-4848 > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4849 > URL: https://issues.apache.org/jira/browse/FLINK-4849 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
Ted Yu created FLINK-4848: - Summary: keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext Key: FLINK-4848 URL: https://issues.apache.org/jira/browse/FLINK-4848 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} String keystoreFilePath = sslConfig.getString( ConfigConstants.SECURITY_SSL_KEYSTORE, null); ... try { keyStoreFile = new FileInputStream(new File(keystoreFilePath)); {code} If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582860#comment-15582860 ] ASF GitHub Bot commented on FLINK-4510: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2453 Thanks for the contribution. I'm going to merge this with slight modifications. We don't need the introduced checks with the current master as there is a check whether periodic checkpoints are activated. > Always create CheckpointCoordinator > --- > > Key: FLINK-4510 > URL: https://issues.apache.org/jira/browse/FLINK-4510 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Jark Wu > > The checkpoint coordinator is only created if a checkpointing interval is > configured. This means that no savepoints can be triggered if there is no > checkpointing interval specified. > Instead we should always create it and allow an interval of 0 for disabled > periodic checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2453 Thanks for the contribution. I'm going to merge this with slight modifications. We don't need the introduced checks with the current master as there is a check whether periodic checkpoints are activated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4174) Enhance Window Evictor
[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vishnu viswanath reassigned FLINK-4174: --- Assignee: vishnu viswanath > Enhance Window Evictor > -- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: vishnu viswanath >Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582779#comment-15582779 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83666257 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java --- @@ -18,14 +18,18 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; import java.util.Set; /** - * Interface for a backend that manages operator state. + * This interface contains methods for registering operator state with a managed store. */ +@PublicEvolving public interface OperatorStateStore { + /** The default namespace for state in cases where no state name is provided */ String DEFAULT_OPERATOR_STATE_NAME = "_default_"; --- End diff -- This is an implementation detail that should not be exposed on this interface. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582782#comment-15582782 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; + +import java.util.concurrent.RunnableFuture; + +/** + * Encapsulates all runnable futures draw snapshots for a single subtask state of an in-flight checkpointing operation. + */ +public class SnapshotInProgressSubtaskState { --- End diff -- I think this could be changed to ``` /** * Result of {@link AbstractStreamOperator#snapshotState}. */ public class OperatorSnapshotResult { ... } ``` to make it more clearer what it is supposed to be. And it should probably be in the same module/package as `AbstractStreamOperator` but the code layout of the state classes seems a bit messy so not sure if it's possible. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582774#comment-15582774 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83664806 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -48,7 +43,7 @@ * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public -public interface RuntimeContext { +public interface RuntimeContext extends KeyedStateStore { --- End diff -- I think it would be better to not have `RuntimeContext` be a `KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably provide a `KeyedStateStore` or at least use one internally to implement the state methods. Properly separating the two now seems prudent. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582784#comment-15582784 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Output stream that allows to write state into several partitions. + * @param type of the returned state handle. + */ +public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream { --- End diff -- I think the javadoc and class name don't accurately describe what this does (possibly due to some refactoring). Now it should probably be called something like `NonClosingCheckpointOutputStream`. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582776#comment-15582776 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83670574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -38,8 +38,8 @@ private final StreamStateHandle delegateStateHandle; public OperatorStateHandle( - StreamStateHandle delegateStateHandle, - MapstateNameToPartitionOffsets) { + Map stateNameToPartitionOffsets, --- End diff -- This is only changing ordering but it's triggering some one-line changes in other files that make it hard to keep track of what changes are really changes. Could you maybe revert that and change it in a follow-up? > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680393 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -2132,24 +2132,36 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex "non-partitioned state changed."); } + @Test --- End diff -- Very good additions! ð ðº --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83685681 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** -* Calls -* {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** -* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also -* calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it +* was not called before. +*/ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think we can keep the old method signature by doing something like this: ``` /** * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. */ public final StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { synchronized (checkpointLock) { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); if (operator instanceof StreamCheckpointedOperator) { ((StreamCheckpointedOperator) operator).snapshotState( outStream, checkpointId, timestamp); } RunnableFuture snapshotRunnable = operator.snapshotState( checkpointId, timestamp, stateBackend.createStreamFactory(new JobID(), "test_op")); if (snapshotRunnable != null) { outStream.write(1); snapshotRunnable.run(); OperatorStateHandle operatorStateHandle = snapshotRunnable.get(); InstantiationUtil.serializeObject(outStream, operatorStateHandle); } else { outStream.write(0); } snapshotToStream(checkpointId, timestamp, outStream); return outStream.closeAndGetHandle(); } } ``` This multiplexes the results from the different operator snapshotting methods into the same stream. The restore method just tweezes out the correct items from the stream and hands them to the correct operator methods. This would let all tests use the same method and we can keep the name/signature the same if we evolve the operator/snapshot interfaces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SnapshotInProgressSubtaskState.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; + +import java.util.concurrent.RunnableFuture; + +/** + * Encapsulates all runnable futures draw snapshots for a single subtask state of an in-flight checkpointing operation. + */ +public class SnapshotInProgressSubtaskState { --- End diff -- I think this could be changed to ``` /** * Result of {@link AbstractStreamOperator#snapshotState}. */ public class OperatorSnapshotResult { ... } ``` to make it more clearer what it is supposed to be. And it should probably be in the same module/package as `AbstractStreamOperator` but the code layout of the state classes seems a bit messy so not sure if it's possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582792#comment-15582792 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680393 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -2132,24 +2132,36 @@ public void testRestoreLatestCheckpointFailureWhenParallelismChanges() throws Ex "non-partitioned state changed."); } + @Test --- End diff -- Very good additions! > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582777#comment-15582777 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673691 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java --- @@ -34,10 +37,14 @@ private static final long serialVersionUID = -2394696997971923995L; - private static final Logger LOG = LoggerFactory.getLogger(SubtaskState.class); - - /** The state of the parallel operator */ - private final ChainedStateHandle chainedStateHandle; + /** +* The state of the parallel operator +*/ + private final ChainedStateHandle nonPartitionableOperatorState; --- End diff -- I think these names don't match the names in the rest of the code base. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83682543 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -238,11 +294,51 @@ public void dispose() throws Exception { } @Override - public RunnableFuture snapshotState( + public SnapshotInProgressSubtaskState snapshotState( --- End diff -- This should probably be `final`, similarly to how `initializeState(OperatorStateHandles)` is `final`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582786#comment-15582786 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83674157 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.util.CollectionUtil; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * This class encapsulates all state handles for a task. + */ +public class TaskStateHandles implements Serializable { --- End diff -- Very good addition for simplifying the handling of all the state handles that are flying around. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582796#comment-15582796 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83685681 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java --- @@ -199,30 +228,56 @@ public Environment getEnvironment() { } /** -* Calls -* {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} */ public void setup() throws Exception { operator.setup(mockTask, config, new MockOutput()); setupCalled = true; } /** -* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also -* calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} * if it was not called before. */ - public void open() throws Exception { + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { if (!setupCalled) { setup(); } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + /** +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. +* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it +* was not called before. +*/ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } operator.open(); } /** * */ - public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { + public SnapshotInProgressSubtaskState snapshot(long checkpointId, long timestamp) throws Exception { --- End diff -- I think we can keep the old method signature by doing something like this: ``` /** * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. */ public final StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception { synchronized (checkpointLock) { CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( new JobID(), "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); if (operator instanceof StreamCheckpointedOperator) { ((StreamCheckpointedOperator) operator).snapshotState( outStream, checkpointId, timestamp); } RunnableFuture snapshotRunnable = operator.snapshotState( checkpointId, timestamp, stateBackend.createStreamFactory(new JobID(), "test_op")); if (snapshotRunnable != null) { outStream.write(1); snapshotRunnable.run(); OperatorStateHandle operatorStateHandle = snapshotRunnable.get(); InstantiationUtil.serializeObject(outStream, operatorStateHandle); } else { outStream.write(0); } snapshotToStream(checkpointId, timestamp, outStream); return outStream.closeAndGetHandle(); } } ``` This multiplexes the results from the different operator snapshotting methods into the same stream. The restore method just tweezes out the correct items from the stream and hands them to the correct operator methods. This would let all tests use the same method and we can keep the name/signature the same if we evolve the
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83673691 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java --- @@ -34,10 +37,14 @@ private static final long serialVersionUID = -2394696997971923995L; - private static final Logger LOG = LoggerFactory.getLogger(SubtaskState.class); - - /** The state of the parallel operator */ - private final ChainedStateHandle chainedStateHandle; + /** +* The state of the parallel operator +*/ + private final ChainedStateHandle nonPartitionableOperatorState; --- End diff -- I think these names don't match the names in the rest of the code base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582794#comment-15582794 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.concurrent.RunnableFuture; + +public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext { --- End diff -- Some Javadocs would probably be helpful. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582791#comment-15582791 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679164 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +public interface KeyGroupsList extends Iterable { --- End diff -- This one could benefit from some Javadocs. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582793#comment-15582793 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83682543 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -238,11 +294,51 @@ public void dispose() throws Exception { } @Override - public RunnableFuture snapshotState( + public SnapshotInProgressSubtaskState snapshotState( --- End diff -- This should probably be `final`, similarly to how `initializeState(OperatorStateHandles)` is `final`. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582787#comment-15582787 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83677603 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader userClassLoader) { } /** -* @see SnapshotProvider +* @see Snapshotable --- End diff -- I think an empty Javadoc does simply prevent tools from displaying the Javadoc of the overridden method so it's probably best to remove those. There are also more instances of that in this file. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582785#comment-15582785 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java --- @@ -25,6 +25,13 @@ import java.util.HashSet; import java.util.Set; +/** + * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed. + * --- End diff -- The correct way of separating paragraphs in Javadoc is this: ``` Paragraph one. Paragraph two ... ``` I know it's not proper HTML nowadays but that's how it's supposed to be ... > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582788#comment-15582788 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * This interface provides a context in which operators that use managed state (i.e. state that is managed by state + * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface + * mainly provides meta information about the checkpoint. + */ +@PublicEvolving +public interface ManagedSnapshotContext { --- End diff -- Same comments as for `ManagedInitializationContext` hold here. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582789#comment-15582789 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; + +/** + * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that + * is managed by state backends). + * + * + * Operator state is available to all operators, while keyed state is only available for operators after keyBy. + * + * + * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from + * a previous execution of this operator. + * + */ +public interface ManagedInitializationContext { --- End diff -- I think this interface and its sub interfaces/implementations should be in the same module as `AbstractStreamOperator` and somewhere in the api package space. Also, the naming could be changed to something like `StateInitializationContext` -> `FunctionInitializationContext` -> `OperatorInitializationContext`. Or something reflecting their purpose but `StateInitializationContext` should be at the bottom of the hierarchy. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582795#comment-15582795 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83683321 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.List; + +/** + * This class holds all state handles for one operator. + */ +public class OperatorStateHandles { --- End diff -- This should be `@Internal` or at least `@PublicEvolving`. Also, the name clashes a bit with `OperatorStateHandle` which does something quite different. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582775#comment-15582775 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83663696 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -305,39 +306,42 @@ public void close() throws Exception { super.close(); } } - + // // Checkpoint and restore // - @Override - public void initializeState(OperatorStateStore stateStore) throws Exception { - this.stateStore = stateStore; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { - ListState offsets = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + offsetsStateForCheckpoint = stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - restoreToOffset = new HashMap<>(); + if (context.isRestored()) { + restoreToOffset = new HashMap<>(); + for (Serializable serializable : offsetsStateForCheckpoint.get()) { + @SuppressWarnings("unchecked") + Tuple2kafkaOffset = (Tuple2 ) serializable; + restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + } - for (Serializable serializable : offsets.get()) { - @SuppressWarnings("unchecked") - Tuple2 kafkaOffset = (Tuple2 ) serializable; - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + LOG.info("Setting restore state in the FlinkKafkaConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", restoreToOffset); + } + } else { + LOG.info("No restore state for FlinkKafkaConsumer."); } - - LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoreToOffset); } @Override - public void prepareSnapshot(long checkpointId, long timestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("storeOperatorState() called on closed source"); } else { - ListState listState = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - listState.clear(); + offsetsStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { --- End diff -- This is a workaround for the fact that we initialise the fetcher in `run()` and not in `open()`. Might be worthwhile to change that in a follow-up, if at all possible. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582780#comment-15582780 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661983 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -55,20 +56,20 @@ /** * Base class of all Flink Kafka Consumer data sources. * This implements the common behavior across all Kafka versions. - * + * --- End diff -- This file contains a lot of whitespace changes. It would be good to remove them before we merge this. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582778#comment-15582778 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661373 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long timestamp) throws Exception } } + @Override --- End diff -- The methods don't need to be reordered here. Also, the state store is not used anywhere, as far as I can see. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582781#comment-15582781 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83669355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.EntrytaskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { -
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83683321 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.List; + +/** + * This class holds all state handles for one operator. + */ +public class OperatorStateHandles { --- End diff -- This should be `@Internal` or at least `@PublicEvolving`. Also, the name clashes a bit with `OperatorStateHandle` which does something quite different. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582783#comment-15582783 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83662569 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -80,38 +81,38 @@ // private final List topics; - + /** The schema to convert between Kafka's byte messages, and Flink's objects */ protected final KeyedDeserializationSchema deserializer; /** The set of topic partitions that the source will read */ protected List subscribedPartitions; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValueperiodicWatermarkAssigner; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, -* to exploit per-partition timestamp characteristics. +* to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue punctuatedWatermarkAssigner; - private transient OperatorStateStore stateStore; + private transient ListState offsetsStateForCheckpoint; --- End diff -- This can can have a more concrete type. You changed `OperatorStateStore.getSerializableListState` to this: ``` ListState getSerializableListState(String stateName) throws Exception; ``` > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State
[ https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582790#comment-15582790 ] ASF GitHub Bot commented on FLINK-4844: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680077 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.io.IOException; +import java.io.InputStream; + +public class NonClosingStreamDecorator extends InputStream { --- End diff -- It's quite clear what it does but Javadocs would still be nice. > Partitionable Raw Keyed/Operator State > -- > > Key: FLINK-4844 > URL: https://issues.apache.org/jira/browse/FLINK-4844 > Project: Flink > Issue Type: New Feature >Reporter: Stefan Richter >Assignee: Stefan Richter > > Partitionable operator and keyed state are currently only available by using > backends. However, the serialization code for many operators is build around > reading/writing their state to a stream for checkpointing. We want to provide > partitionable states also through streams, so that migrating existing > operators becomes more easy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83680077 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.io.IOException; +import java.io.InputStream; + +public class NonClosingStreamDecorator extends InputStream { --- End diff -- It's quite clear what it does but Javadocs would still be nice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractPartitionedCheckpointOutputStream.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Output stream that allows to write state into several partitions. + * @param type of the returned state handle. + */ +public abstract class AbstractPartitionedCheckpointOutputStream extends OutputStream { --- End diff -- I think the javadoc and class name don't accurately describe what this does (possibly due to some refactoring). Now it should probably be called something like `NonClosingCheckpointOutputStream`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83662569 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -80,38 +81,38 @@ // private final List topics; - + /** The schema to convert between Kafka's byte messages, and Flink's objects */ protected final KeyedDeserializationSchema deserializer; /** The set of topic partitions that the source will read */ protected List subscribedPartitions; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValueperiodicWatermarkAssigner; - + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, -* to exploit per-partition timestamp characteristics. +* to exploit per-partition timestamp characteristics. * The assigner is kept in serialized form, to deserialize it into multiple copies */ private SerializedValue punctuatedWatermarkAssigner; - private transient OperatorStateStore stateStore; + private transient ListState offsetsStateForCheckpoint; --- End diff -- This can can have a more concrete type. You changed `OperatorStateStore.getSerializableListState` to this: ``` ListState getSerializableListState(String stateName) throws Exception; ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83677603 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -132,7 +133,7 @@ public DefaultOperatorStateBackend(ClassLoader userClassLoader) { } /** -* @see SnapshotProvider +* @see Snapshotable --- End diff -- I think an empty Javadoc does simply prevent tools from displaying the Javadoc of the overridden method so it's probably best to remove those. There are also more instances of that in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679164 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsList.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +public interface KeyGroupsList extends Iterable { --- End diff -- This one could benefit from some Javadocs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661373 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -348,6 +345,11 @@ public void prepareSnapshot(long checkpointId, long timestamp) throws Exception } } + @Override --- End diff -- The methods don't need to be reordered here. Also, the state store is not used anywhere, as far as I can see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83661983 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -55,20 +56,20 @@ /** * Base class of all Flink Kafka Consumer data sources. * This implements the common behavior across all Kafka versions. - * + * --- End diff -- This file contains a lot of whitespace changes. It would be good to remove them before we merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83679673 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.concurrent.RunnableFuture; + +public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext { --- End diff -- Some Javadocs would probably be helpful. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83666257 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java --- @@ -18,14 +18,18 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.PublicEvolving; + import java.io.Serializable; import java.util.Set; /** - * Interface for a backend that manages operator state. + * This interface contains methods for registering operator state with a managed store. */ +@PublicEvolving public interface OperatorStateStore { + /** The default namespace for state in cases where no state name is provided */ String DEFAULT_OPERATOR_STATE_NAME = "_default_"; --- End diff -- This is an implementation detail that should not be exposed on this interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * This interface provides a context in which operators that use managed state (i.e. state that is managed by state + * backends) can perform a snapshot. As snapshots of the backends themselves are taken by the system, this interface + * mainly provides meta information about the checkpoint. + */ +@PublicEvolving +public interface ManagedSnapshotContext { --- End diff -- Same comments as for `ManagedInitializationContext` hold here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83678698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.OperatorStateStore; + +/** + * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that + * is managed by state backends). + * + * + * Operator state is available to all operators, while keyed state is only available for operators after keyBy. + * + * + * For the purpose of initialization, the context signals if the state is empty (new operator) or was restored from + * a previous execution of this operator. + * + */ +public interface ManagedInitializationContext { --- End diff -- I think this interface and its sub interfaces/implementations should be in the same module as `AbstractStreamOperator` and somewhere in the api package space. Also, the naming could be changed to something like `StateInitializationContext` -> `FunctionInitializationContext` -> `OperatorInitializationContext`. Or something reflecting their purpose but `StateInitializationContext` should be at the bottom of the hierarchy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83676729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java --- @@ -25,6 +25,13 @@ import java.util.HashSet; import java.util.Set; +/** + * This class allows to register instances of {@link Closeable}, which are all closed if this registry is closed. + * --- End diff -- The correct way of separating paragraphs in Javadoc is this: ``` Paragraph one. Paragraph two ... ``` I know it's not proper HTML nowadays but that's how it's supposed to be ... ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83669355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState( LOG.info("Restoring from latest valid checkpoint: {}.", latest); - for (Map.EntrytaskGroupStateEntry: latest.getTaskStates().entrySet()) { - TaskState taskState = taskGroupStateEntry.getValue(); - ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); - - if (executionJobVertex != null) { - // check that the number of key groups have not changed - if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - throw new IllegalStateException("The maximum parallelism (" + - taskState.getMaxParallelism() + ") with which the latest " + - "checkpoint of the execution job vertex " + executionJobVertex + - " has been taken and the current maximum parallelism (" + - executionJobVertex.getMaxParallelism() + ") changed. This " + - "is currently not supported."); - } - - - int oldParallelism = taskState.getParallelism(); - int newParallelism = executionJobVertex.getParallelism(); - boolean parallelismChanged = oldParallelism != newParallelism; - boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - - if (hasNonPartitionedState && parallelismChanged) { - throw new IllegalStateException("Cannot restore the latest checkpoint because " + - "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + - "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + newParallelism + " whereas the corresponding" + - "state object has a parallelism of " + oldParallelism); - } - - List keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - newParallelism); - - // operator chain index -> list of the stored partitionables states from all parallel instances - @SuppressWarnings("unchecked") - List[] chainParallelStates = - new List[taskState.getChainLength()]; - - for (int i = 0; i < oldParallelism; ++i) { - - ChainedStateHandle partitionableState = - taskState.getPartitionableState(i); - - if (partitionableState != null) { - for (int j = 0; j < partitionableState.getLength(); ++j) { - OperatorStateHandle opParalleState = partitionableState.get(j); - if (opParalleState != null) { - List opParallelStates = - chainParallelStates[j]; - if (opParallelStates == null) { - opParallelStates = new ArrayList<>(); - chainParallelStates[j] = opParallelStates; - } -
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83664806 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -48,7 +43,7 @@ * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public -public interface RuntimeContext { +public interface RuntimeContext extends KeyedStateStore { --- End diff -- I think it would be better to not have `RuntimeContext` be a `KeyedStateStore`. In the not-so-far future `RuntimeContext` will probably provide a `KeyedStateStore` or at least use one internally to implement the state methods. Properly separating the two now seems prudent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83674157 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.util.CollectionUtil; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * This class encapsulates all state handles for a task. + */ +public class TaskStateHandles implements Serializable { --- End diff -- Very good addition for simplifying the handling of all the state handles that are flying around. ð ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83670574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java --- @@ -38,8 +38,8 @@ private final StreamStateHandle delegateStateHandle; public OperatorStateHandle( - StreamStateHandle delegateStateHandle, - MapstateNameToPartitionOffsets) { + Map stateNameToPartitionOffsets, --- End diff -- This is only changing ordering but it's triggering some one-line changes in other files that make it hard to keep track of what changes are really changes. Could you maybe revert that and change it in a follow-up? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2648#discussion_r83663696 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -305,39 +306,42 @@ public void close() throws Exception { super.close(); } } - + // // Checkpoint and restore // - @Override - public void initializeState(OperatorStateStore stateStore) throws Exception { - this.stateStore = stateStore; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { - ListState offsets = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); + OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + offsetsStateForCheckpoint = stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - restoreToOffset = new HashMap<>(); + if (context.isRestored()) { + restoreToOffset = new HashMap<>(); + for (Serializable serializable : offsetsStateForCheckpoint.get()) { + @SuppressWarnings("unchecked") + Tuple2kafkaOffset = (Tuple2 ) serializable; + restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + } - for (Serializable serializable : offsets.get()) { - @SuppressWarnings("unchecked") - Tuple2 kafkaOffset = (Tuple2 ) serializable; - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + LOG.info("Setting restore state in the FlinkKafkaConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", restoreToOffset); + } + } else { + LOG.info("No restore state for FlinkKafkaConsumer."); } - - LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoreToOffset); } @Override - public void prepareSnapshot(long checkpointId, long timestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug("storeOperatorState() called on closed source"); } else { - ListState listState = - stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME); - listState.clear(); + offsetsStateForCheckpoint.clear(); final AbstractFetcher fetcher = this.kafkaFetcher; if (fetcher == null) { --- End diff -- This is a workaround for the fact that we initialise the fetcher in `run()` and not in `open()`. Might be worthwhile to change that in a follow-up, if at all possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582735#comment-15582735 ] ASF GitHub Bot commented on FLINK-4715: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2652 [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck - Splits the cancellation up into two threads: * The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on the executing Thread. It then exists. * The `TaskCancellationWatchDog` kicks in after the task cancellation timeout (current default: 30 secs) and periodically calls `interrupt` on the executing Thread. If the Thread does not terminate within the task cancellation timeout (new config value, default 3 mins), the task manager is notified about a fatal error, leading to termination of the JVM. - The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS` (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval). You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4715-suicide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2652 > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2652 [FLINK-4715] Fail TaskManager with fatal error if task cancellation is stuck - Splits the cancellation up into two threads: * The `TaskCanceler` calls `cancel` on the invokable and `interrupt` on the executing Thread. It then exists. * The `TaskCancellationWatchDog` kicks in after the task cancellation timeout (current default: 30 secs) and periodically calls `interrupt` on the executing Thread. If the Thread does not terminate within the task cancellation timeout (new config value, default 3 mins), the task manager is notified about a fatal error, leading to termination of the JVM. - The new configuration is exposed via `ConfigConstants.TASK_CANCELLATION_TIMEOUT_MILLIS` (default: 3 mins) and the `ExecutionConfig` (similar to the cancellation interval). You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4715-suicide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2652 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582606#comment-15582606 ] ASF GitHub Bot commented on FLINK-3599: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2211 Ah, thanks for pointing out the dependency on #2094. I wasn't aware of that. Will try to push that PR further than :-). > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2211 Ah, thanks for pointing out the dependency on #2094. I wasn't aware of that. Will try to push that PR further than :-). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
[ https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582605#comment-15582605 ] Maximilian Michels commented on FLINK-2821: --- Thanks for testing [~philipp.bussche]. There was an issue with the dependency management of my custom build. I've resolved the issue and would like to invite you to try again: http://people.apache.org/~mxm/flink-1.1.3-custom-akka.zip With the new version, you should be able to run your containers. Next, we should resolve why you can't use the external address of the Rancher container and bind to the internal container address. What kind of error do you get? > Change Akka configuration to allow accessing actors from different URLs > --- > > Key: FLINK-2821 > URL: https://issues.apache.org/jira/browse/FLINK-2821 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Robert Metzger >Assignee: Maximilian Michels > > Akka expects the actor's URL to be exactly matching. > As pointed out here, cases where users were complaining about this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html > - Proxy routing (as described here, send to the proxy URL, receiver > recognizes only original URL) > - Using hostname / IP interchangeably does not work (we solved this by > always putting IP addresses into URLs, never hostnames) > - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still > no solution to that (but seems not too much of a restriction) > I am aware that this is not possible due to Akka, so it is actually not a > Flink bug. But I think we should track the resolution of the issue here > anyways because its affecting our user's satisfaction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582596#comment-15582596 ] ASF GitHub Bot commented on FLINK-3599: --- Github user Xazax-hun commented on the issue: https://github.com/apache/flink/pull/2211 Hi! This patch depends on the following pull request: https://github.com/apache/flink/pull/2094 Once it is landed I will remove the [WIP] tag. I did not remove it yet because I did not want the reviewer to review changes that was not done by me. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...
Github user Xazax-hun commented on the issue: https://github.com/apache/flink/pull/2211 Hi! This patch depends on the following pull request: https://github.com/apache/flink/pull/2094 Once it is landed I will remove the [WIP] tag. I did not remove it yet because I did not want the reviewer to review changes that was not done by me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2651: [FLINK-4847] Let RpcEndpoint.start/shutDown throw ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2651 [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink rpcExceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2651 commit ddf35c4ddb04629cddebb2401488effe93416b70 Author: Till RohrmannDate: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions
[ https://issues.apache.org/jira/browse/FLINK-4847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582405#comment-15582405 ] ASF GitHub Bot commented on FLINK-4847: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2651 [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink rpcExceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2651 commit ddf35c4ddb04629cddebb2401488effe93416b70 Author: Till RohrmannDate: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. > Let RpcEndpoint.start/shutDown throw exceptions > --- > > Key: FLINK-4847 > URL: https://issues.apache.org/jira/browse/FLINK-4847 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be > allowed to throw exceptions if things go wrong. Otherwise, exceptions will be > given to a callback which handles them later, even though we know that we can > fail the components right away (as it is the case for the {{TaskExectuor}}, > for example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions
Till Rohrmann created FLINK-4847: Summary: Let RpcEndpoint.start/shutDown throw exceptions Key: FLINK-4847 URL: https://issues.apache.org/jira/browse/FLINK-4847 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Reporter: Till Rohrmann The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be allowed to throw exceptions if things go wrong. Otherwise, exceptions will be given to a callback which handles them later, even though we know that we can fail the components right away (as it is the case for the {{TaskExectuor}}, for example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2498 Yes, I merge this later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails
[ https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582373#comment-15582373 ] ASF GitHub Bot commented on FLINK-4619: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2498 Yes, I merge this later today. > JobManager does not answer to client when restore from savepoint fails > -- > > Key: FLINK-4619 > URL: https://issues.apache.org/jira/browse/FLINK-4619 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Maciej Prochniak > Fix For: 1.2.0, 1.1.3 > > > When savepoint used is incompatible with currently deployed process, the job > manager never returns (jobInfo.notifyClients is not invoked in one of > try-catch blocks) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4847) Let RpcEndpoint.start/shutDown throw exceptions
[ https://issues.apache.org/jira/browse/FLINK-4847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4847: Assignee: Till Rohrmann > Let RpcEndpoint.start/shutDown throw exceptions > --- > > Key: FLINK-4847 > URL: https://issues.apache.org/jira/browse/FLINK-4847 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcEndpoint.start}} and {{RpcEndpoint.shutDown}} methods should be > allowed to throw exceptions if things go wrong. Otherwise, exceptions will be > given to a callback which handles them later, even though we know that we can > fail the components right away (as it is the case for the {{TaskExectuor}}, > for example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4839) JobManager handle TaskManager's slot offering
[ https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582341#comment-15582341 ] ASF GitHub Bot commented on FLINK-4839: --- Github user KurtYoung closed the pull request at: https://github.com/apache/flink/pull/2647 > JobManager handle TaskManager's slot offering > - > > Key: FLINK-4839 > URL: https://issues.apache.org/jira/browse/FLINK-4839 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > JobManager receives the TaskManager's slot offers, and decide which slots to > accept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2647: [FLINK-4839] [cluster management] JobManager handle TaskM...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2647 Thanks for you reviewing @tillrohrmann , closing it now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4839) JobManager handle TaskManager's slot offering
[ https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582342#comment-15582342 ] ASF GitHub Bot commented on FLINK-4839: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2647 Thanks for you reviewing @tillrohrmann , closing it now. > JobManager handle TaskManager's slot offering > - > > Key: FLINK-4839 > URL: https://issues.apache.org/jira/browse/FLINK-4839 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > JobManager receives the TaskManager's slot offers, and decide which slots to > accept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2647: [FLINK-4839] [cluster management] JobManager handl...
Github user KurtYoung closed the pull request at: https://github.com/apache/flink/pull/2647 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4840) Introduce an OperatorSystemMetricGroup
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582332#comment-15582332 ] Stephan Ewen commented on FLINK-4840: - To me, throughput and latency could be in the I/O scope, because throughput is the number of records in/out per time. > Introduce an OperatorSystemMetricGroup > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > Fix For: 1.2.0 > > > There will introduces the OperatorSystemMetricGroup, which encapsulates the > insantiation of TPS meter, lantency/proc_time_cost histograms. > Operator related System metrics are not instantiated directly by the specific > Operator, but instead within the OperatorSystemMetricGroup contained in the > respective OperatorMetricGroup. They are then later accessed by relevant > components(maybe different places), instead of instantiated them identically > with static name constants. Other system scope metrics (maybe > delay/queue_in/queue_out) can add into the OperatorSystemMetricGroup some > later. > TPS: collect records per second(StreamSource), processed elements per > second(other operator) > lantency/proc_time_cost : collect an record time cost(StreamSource), > processed an element time cost (other operator) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2646: [FLINK-4843] Test for FsCheckpointStateOutputStream::getP...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2646 Thanks for the update, looks good! +1 to merge this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4843) Introduce Test for FsCheckpointStateOutputStream::getPos
[ https://issues.apache.org/jira/browse/FLINK-4843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582323#comment-15582323 ] ASF GitHub Bot commented on FLINK-4843: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2646 Thanks for the update, looks good! +1 to merge this > Introduce Test for FsCheckpointStateOutputStream::getPos > > > Key: FLINK-4843 > URL: https://issues.apache.org/jira/browse/FLINK-4843 > Project: Flink > Issue Type: Test >Reporter: Stefan Richter >Assignee: Stefan Richter > > Introduce a test for FsCheckpointStateOutputStream::getPos, which is > currently not included in the tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4840) Introduce an OperatorSystemMetricGroup
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582316#comment-15582316 ] Chesnay Schepler commented on FLINK-4840: - recordsPerSecond will be added into the OperatorIOMetricGroup once the MetricViews are merged (https://issues.apache.org/jira/browse/FLINK-3950). The processing time metric seems a bit redundant, can't you infer that from the numbers of records processed per second? > Introduce an OperatorSystemMetricGroup > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > Fix For: 1.2.0 > > > There will introduces the OperatorSystemMetricGroup, which encapsulates the > insantiation of TPS meter, lantency/proc_time_cost histograms. > Operator related System metrics are not instantiated directly by the specific > Operator, but instead within the OperatorSystemMetricGroup contained in the > respective OperatorMetricGroup. They are then later accessed by relevant > components(maybe different places), instead of instantiated them identically > with static name constants. Other system scope metrics (maybe > delay/queue_in/queue_out) can add into the OperatorSystemMetricGroup some > later. > TPS: collect records per second(StreamSource), processed elements per > second(other operator) > lantency/proc_time_cost : collect an record time cost(StreamSource), > processed an element time cost (other operator) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4839) JobManager handle TaskManager's slot offering
[ https://issues.apache.org/jira/browse/FLINK-4839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4839. -- Resolution: Fixed Fixed via 4f891a6c26847ac66c477853101de31eb75993f7 > JobManager handle TaskManager's slot offering > - > > Key: FLINK-4839 > URL: https://issues.apache.org/jira/browse/FLINK-4839 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Assignee: Kurt Young > > JobManager receives the TaskManager's slot offers, and decide which slots to > accept. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582301#comment-15582301 ] ASF GitHub Bot commented on FLINK-4510: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2453 Looks good to me now. > Always create CheckpointCoordinator > --- > > Key: FLINK-4510 > URL: https://issues.apache.org/jira/browse/FLINK-4510 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Jark Wu > > The checkpoint coordinator is only created if a checkpointing interval is > configured. This means that no savepoints can be triggered if there is no > checkpointing interval specified. > Instead we should always create it and allow an interval of 0 for disabled > periodic checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2453 Looks good to me now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4846) FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM
Thomas FOURNIER created FLINK-4846: -- Summary: FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM Key: FLINK-4846 URL: https://issues.apache.org/jira/browse/FLINK-4846 Project: Flink Issue Type: Improvement Reporter: Thomas FOURNIER Priority: Minor With Flink ML you can import file via MLUtils.readLibSVM (import org.apache.flink.ml.MLUtils) For example: val env = ExecutionEnvironment.getExecutionEnvironment val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "src/main/resources/svmguide1") I'd like to pass "env" as an implicit parameter and use the method as such: val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM("src/main/resources/svmguide1") Is it ok (not a scala specialist yet :) ) ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)