[GitHub] flink pull request #2324: Bump version to 1.2-SNAPSHOT
GitHub user mbalassi opened a pull request: https://github.com/apache/flink/pull/2324 Bump version to 1.2-SNAPSHOT It is about time to do this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink 12-bump Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2324 commit 3a958506d52ca60d4f58a627798ba65fd91cf02d Author: Marton BalassiDate: 2016-08-01T23:20:20Z Bump version to 1.2-SNAPSHOT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3414) Add Scala API for CEP's pattern definition
[ https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402825#comment-15402825 ] Ivan Mushketyk commented on FLINK-3414: --- Hey Till. Thank you for you reply. I'll implement an initial version of the DSL and create a PR and then we can continue discussing what should be changed in it. > Add Scala API for CEP's pattern definition > -- > > Key: FLINK-3414 > URL: https://issues.apache.org/jira/browse/FLINK-3414 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > Currently, the CEP library only supports a Java API to specify complex event > patterns. In order to make it a bit less verbose for Scala users, it would be > nice to also add a Scala API for the CEP library. > A Scala API would also allow to pass Scala's anonymous functions as filter > conditions or as a select function, for example, or to use partial functions > to distinguish between different events. > Furthermore, the Scala API could be designed to feel a bit more like a DSL: > {code} > begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || > "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3414) Add Scala API for CEP's pattern definition
[ https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-3414: - Assignee: Ivan Mushketyk > Add Scala API for CEP's pattern definition > -- > > Key: FLINK-3414 > URL: https://issues.apache.org/jira/browse/FLINK-3414 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > Currently, the CEP library only supports a Java API to specify complex event > patterns. In order to make it a bit less verbose for Scala users, it would be > nice to also add a Scala API for the CEP library. > A Scala API would also allow to pass Scala's anonymous functions as filter > conditions or as a select function, for example, or to use partial functions > to distinguish between different events. > Furthermore, the Scala API could be designed to feel a bit more like a DSL: > {code} > begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || > "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3873) Add a Kafka TableSink with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402814#comment-15402814 ] Ivan Mushketyk edited comment on FLINK-3873 at 8/1/16 9:03 PM: --- I've implemented TableSink with Avro serialization here: https://github.com/mushketyk/flink/tree/kafka-avro, but since it is based on code from this pull-request https://github.com/apache/flink/pull/2244 (not yet merged) I cannot create a pull request. was (Author: ivan.mushketyk): I've implemented TableSink with Avro serialization here: https://github.com/apache/flink/pull/2244, but since it is based on code from this pull-request https://github.com/apache/flink/pull/2244 (not yet merged) I cannot create a pull request. > Add a Kafka TableSink with Avro serialization > - > > Key: FLINK-3873 > URL: https://issues.apache.org/jira/browse/FLINK-3873 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes Avro serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3873) Add a Kafka TableSink with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402814#comment-15402814 ] Ivan Mushketyk commented on FLINK-3873: --- I've implemented TableSink with Avro serialization here: https://github.com/apache/flink/pull/2244, but since it is based on code from this pull-request https://github.com/apache/flink/pull/2244 (not yet merged) I cannot create a pull request. > Add a Kafka TableSink with Avro serialization > - > > Key: FLINK-3873 > URL: https://issues.apache.org/jira/browse/FLINK-3873 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes Avro serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4043) Generalize RabbitMQ connector into AMQP connector
[ https://issues.apache.org/jira/browse/FLINK-4043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402806#comment-15402806 ] Ivan Mushketyk commented on FLINK-4043: --- If you are going to work on it, here is an implementation of the ActiveMQ connector: https://github.com/apache/flink/pull/2314 > Generalize RabbitMQ connector into AMQP connector > - > > Key: FLINK-4043 > URL: https://issues.apache.org/jira/browse/FLINK-4043 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Robert Metzger > > Our current RabbitMQ connector is actually using a AMQP client implemented by > RabbitMQ. > AMQP is a protocol for message queues, implemented by different clients and > brokers. > I'm suggesting to rename the connector so that its more obvious to users of > other brokers that they can use the connector as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2323: [FLINK-2090] toString of CollectionInputFormat takes long...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2323 Set maximum limit for the toString result, as suggested by Stephan here: https://issues.apache.org/jira/browse/FLINK-2090 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2090) toString of CollectionInputFormat takes long time when the collection is huge
[ https://issues.apache.org/jira/browse/FLINK-2090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402762#comment-15402762 ] ASF GitHub Bot commented on FLINK-2090: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2323 Set maximum limit for the toString result, as suggested by Stephan here: https://issues.apache.org/jira/browse/FLINK-2090 > toString of CollectionInputFormat takes long time when the collection is huge > - > > Key: FLINK-2090 > URL: https://issues.apache.org/jira/browse/FLINK-2090 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > The {{toString}} method of {{CollectionInputFormat}} calls {{toString}} on > its underlying {{Collection}}. Thus, {{toString}} is called for each element > of the collection. If the {{Collection}} contains many elements or the > individual {{toString}} calls for each element take a long time, then the > string generation can take a considerable amount of time. [~mikiobraun] > noticed that when he inserted several jBLAS matrices into Flink. > The {{toString}} method is mainly used for logging statements in > {{DataSourceNode}}'s {{computeOperatorSpecificDefaultEstimates}} method and > in {{JobGraphGenerator.getDescriptionForUserCode}}. I'm wondering whether it > is necessary to print the complete content of the underlying {{Collection}} > or if it's not enough to print only the first 3 elements in the {{toString}} > method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2090) toString of CollectionInputFormat takes long time when the collection is huge
[ https://issues.apache.org/jira/browse/FLINK-2090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402759#comment-15402759 ] ASF GitHub Bot commented on FLINK-2090: --- GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2323 [FLINK-2090] toString of CollectionInputFormat takes long time when t… Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed …he collection is huge You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink fast-to-string Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2323 commit 76c5b7dd1cf12b17b7601b2d1c8ea7cc475a031c Author: Ivan MushketykDate: 2016-08-01T19:39:17Z [FLINK-2090] toString of CollectionInputFormat takes long time when the collection is huge > toString of CollectionInputFormat takes long time when the collection is huge > - > > Key: FLINK-2090 > URL: https://issues.apache.org/jira/browse/FLINK-2090 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > The {{toString}} method of {{CollectionInputFormat}} calls {{toString}} on > its underlying {{Collection}}. Thus, {{toString}} is called for each element > of the collection. If the {{Collection}} contains many elements or the > individual {{toString}} calls for each element take a long time, then the > string generation can take a considerable amount of time. [~mikiobraun] > noticed that when he inserted several jBLAS matrices into Flink. > The {{toString}} method is mainly used for logging statements in > {{DataSourceNode}}'s {{computeOperatorSpecificDefaultEstimates}} method and > in {{JobGraphGenerator.getDescriptionForUserCode}}. I'm wondering whether it > is necessary to print the complete content of the underlying {{Collection}} > or if it's not enough to print only the first 3 elements in the {{toString}} > method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2323: [FLINK-2090] toString of CollectionInputFormat tak...
GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2323 [FLINK-2090] toString of CollectionInputFormat takes long time when t⦠Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed â¦he collection is huge You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink fast-to-string Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2323 commit 76c5b7dd1cf12b17b7601b2d1c8ea7cc475a031c Author: Ivan MushketykDate: 2016-08-01T19:39:17Z [FLINK-2090] toString of CollectionInputFormat takes long time when the collection is huge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402673#comment-15402673 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73037620 --- Diff: docs/internals/flink_security.md --- @@ -0,0 +1,87 @@ +--- +title: "Flink Security" +# Top navigation +top-nav-group: internals +top-nav-pos: 10 +top-nav-title: Flink Security +--- + + +This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) +and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers +who plans to run Flink on a secure environment. + +## Objective + +The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In production deployment scenario, +streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure +data sources throughout the life of the job. The current implementation supports running Flink cluster (Job Manager/Task Manager/Jobs) under the +context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. + +## How Flink Security works +Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. +A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security +requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolve over a period +of time but at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. + +- Kafka (0.9) +- HDFS +- ZooKeeper + +Hadoop uses UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. Flink bootstrap implementation +(JM/TM/CLI) takes care of instantiating UGI with appropriate security credentials to establish necessary security context. + +Services like Kafka and ZooKeeper uses SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with platform-specific login +module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is +instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism. + +It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under specific user context i.e., if Hadoop security is enabled +then the Flink processes will be running under secure user account or else it will run as the OS login user account who starts Flink cluster. + +## Security Configurations + +Secure credentials can be supplied by adding below configuration elements to Flink configuration file: + +- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret. + +- `security.principal`: User principal name that the Flink cluster should run as. --- End diff -- The keytab file contains both principal and encrypted keys (password) > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73037620 --- Diff: docs/internals/flink_security.md --- @@ -0,0 +1,87 @@ +--- +title: "Flink Security" +# Top navigation +top-nav-group: internals +top-nav-pos: 10 +top-nav-title: Flink Security +--- + + +This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) +and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers +who plans to run Flink on a secure environment. + +## Objective + +The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In production deployment scenario, +streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure +data sources throughout the life of the job. The current implementation supports running Flink cluster (Job Manager/Task Manager/Jobs) under the +context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. + +## How Flink Security works +Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. +A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security +requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolve over a period +of time but at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. + +- Kafka (0.9) +- HDFS +- ZooKeeper + +Hadoop uses UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. Flink bootstrap implementation +(JM/TM/CLI) takes care of instantiating UGI with appropriate security credentials to establish necessary security context. + +Services like Kafka and ZooKeeper uses SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with platform-specific login +module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is +instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism. + +It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under specific user context i.e., if Hadoop security is enabled +then the Flink processes will be running under secure user account or else it will run as the OS login user account who starts Flink cluster. + +## Security Configurations + +Secure credentials can be supplied by adding below configuration elements to Flink configuration file: + +- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret. + +- `security.principal`: User principal name that the Flink cluster should run as. --- End diff -- The keytab file contains both principal and encrypted keys (password) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 @mxm - Thanks for your feedback and here is my response to some of your comments. - Do we need to run all the Yarn tests normally and secured? We already have problems with our test execution time. Perhaps we could have one dedicated test for secure setups and disable the other ones by default to run them manually if needed. [Vijay] - Yes, it is not essential to run the secure test case all the time as it consumes more cycles. Do you have any suggestion on controlling this through some mvn/surefire plugin configuration? - The testing code seems overly complicated using the custom JUnit Runner. I think we could achieve the same with @BeforeClass and @AfterClass methods in the secure IT cases. [Vijay] - It is little overhead but works out well with minimal changes to the code. We could revisit and make any changes if it creates any issues. - There is no dedicated test for the SecurityContext and the JaasConfiguration classes [Vijay] - Yes, will add UT for those classes. - It would be nice to add some documentation to the configuration web page. [Vijay] - I believe you are referring to the https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html. If so, yes it certainly helps and I will be happy to add the details but I don't have access to edit the page. - We should throw exceptions if the secure configuration is not complete instead of falling back to non-authenticated execution for either Hadoop or the Jaas configuration. Otherwise, users might end up with a partly secure environment. [Vijay] - Yes, will add the validation logic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402626#comment-15402626 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 @mxm - Thanks for your feedback and here is my response to some of your comments. - Do we need to run all the Yarn tests normally and secured? We already have problems with our test execution time. Perhaps we could have one dedicated test for secure setups and disable the other ones by default to run them manually if needed. [Vijay] - Yes, it is not essential to run the secure test case all the time as it consumes more cycles. Do you have any suggestion on controlling this through some mvn/surefire plugin configuration? - The testing code seems overly complicated using the custom JUnit Runner. I think we could achieve the same with @BeforeClass and @AfterClass methods in the secure IT cases. [Vijay] - It is little overhead but works out well with minimal changes to the code. We could revisit and make any changes if it creates any issues. - There is no dedicated test for the SecurityContext and the JaasConfiguration classes [Vijay] - Yes, will add UT for those classes. - It would be nice to add some documentation to the configuration web page. [Vijay] - I believe you are referring to the https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html. If so, yes it certainly helps and I will be happy to add the details but I don't have access to edit the page. - We should throw exceptions if the secure configuration is not complete instead of falling back to non-authenticated execution for either Hadoop or the Jaas configuration. Otherwise, users might end up with a partly secure environment. [Vijay] - Yes, will add the validation logic > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402606#comment-15402606 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 @nielsbasjes - In most deployments, the KRB5 configuration file will be located in a well known (for e.g., /etc/krb5.conf) but in scenarios where custom location needs to be provided, we could pass the value through "-Djava.security.krb5.conf" > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 @nielsbasjes - In most deployments, the KRB5 configuration file will be located in a well known (for e.g., /etc/krb5.conf) but in scenarios where custom location needs to be provided, we could pass the value through "-Djava.security.krb5.conf" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402585#comment-15402585 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73028461 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java --- @@ -75,34 +84,47 @@ public static void runYarnTaskManager(String[] args, final Class toks : UserGroupInformation.getCurrentUser().getTokens()) { - ugi.addToken(toks); + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, ConfigConstants.KEYTAB_FILE_NAME); --- End diff -- The name is not configurable (user provided) but we use a constant value. Is there any reason to keep the name unique? > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73028461 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java --- @@ -75,34 +84,47 @@ public static void runYarnTaskManager(String[] args, final Class toks : UserGroupInformation.getCurrentUser().getTokens()) { - ugi.addToken(toks); + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, ConfigConstants.KEYTAB_FILE_NAME); --- End diff -- The name is not configurable (user provided) but we use a constant value. Is there any reason to keep the name unique? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402572#comment-15402572 ] Stephan Ewen commented on FLINK-4094: - If we pool the off-heap byte buffers, we effectively make a behavior very similar to pre-allocation for the off heap case. The crux here is that there is no point when the released off-heap memory is reclaimed before new off-heap memory is allocated. The JVM has the parameter {{MaxDirectMemorySize}} to limit the direct memory, and force a garbage collection (releasing off heap memory) when too much would be otherwise allocated. So, another option to fix this would be to set the {{MaxDirectMemorySize}} parameter properly. Other than that, we can only pool. We cannot really manually release the memory when freeing the segment, because the ByteBuffer wrapper object may still exist. Freeing the memory too early will result in segmentation faults. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402570#comment-15402570 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73027093 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeSelectionRunner.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.InitializationError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Custom JRunner used to run integration tests for Kafka in secure mode. Since the test code implementation uses + * abstract base code to run the MiniFlinkCluster and also start/stop Kafka brokers depending on the version (0.8/0.9) + * we need to know the run mode (CLEAR/SECURE) ahead of time (before the beforeClass). This Runner instance + * take care of setting SECURE flag on the holder class for secure testing to work seamless + * + */ +public class RunTypeSelectionRunner extends BlockJUnit4ClassRunner { --- End diff -- I agree with you that it will be nice to control this behavior in @BeforeClass but the challenge is controlling test run (secure vs insecure mode). The base class should be aware of the run mode (secure/insecure) ahead of time and I found it very challenging especially when the lifecycle of the services (MiniFlink, Kafka/ZK etc.,) are handled at different levels. I ended up in using custom JRunner which gives the flexibility in handling the run level from the top most layer (@Test class) without having to readjust the codebase much. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73027093 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeSelectionRunner.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.InitializationError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Custom JRunner used to run integration tests for Kafka in secure mode. Since the test code implementation uses + * abstract base code to run the MiniFlinkCluster and also start/stop Kafka brokers depending on the version (0.8/0.9) + * we need to know the run mode (CLEAR/SECURE) ahead of time (before the beforeClass). This Runner instance + * take care of setting SECURE flag on the holder class for secure testing to work seamless + * + */ +public class RunTypeSelectionRunner extends BlockJUnit4ClassRunner { --- End diff -- I agree with you that it will be nice to control this behavior in @BeforeClass but the challenge is controlling test run (secure vs insecure mode). The base class should be aware of the run mode (secure/insecure) ahead of time and I found it very challenging especially when the lifecycle of the services (MiniFlink, Kafka/ZK etc.,) are handled at different levels. I ended up in using custom JRunner which gives the flexibility in handling the run level from the top most layer (@Test class) without having to readjust the codebase much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2363) Add an end-to-end overview of program execution in Flink to the docs
[ https://issues.apache.org/jira/browse/FLINK-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402551#comment-15402551 ] ASF GitHub Bot commented on FLINK-2363: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/913 > Add an end-to-end overview of program execution in Flink to the docs > > > Key: FLINK-2363 > URL: https://issues.apache.org/jira/browse/FLINK-2363 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Stephan Ewen > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #913: [FLINK-2363] [docs] First part of internals - compl...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/913 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023852 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeHolder.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +public class RunTypeHolder { --- End diff -- Will add the desc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402540#comment-15402540 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023816 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java --- @@ -81,13 +90,21 @@ public static void prepare() throws IOException, ClassNotFoundException { LOG.info("-"); LOG.info("Starting KafkaTestBase "); LOG.info("-"); - + Configuration flinkConfig = new Configuration(); // dynamically load the implementation for the test Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + LOG.info("Runtype: {}", RunTypeHolder.get()); + if(RunTypeHolder.get().equals(RunTypeHolder.RunType.SECURE) --- End diff -- Yes == is more appropriate > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402541#comment-15402541 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023852 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeHolder.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +public class RunTypeHolder { --- End diff -- Will add the desc > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023816 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java --- @@ -81,13 +90,21 @@ public static void prepare() throws IOException, ClassNotFoundException { LOG.info("-"); LOG.info("Starting KafkaTestBase "); LOG.info("-"); - + Configuration flinkConfig = new Configuration(); // dynamically load the implementation for the test Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + LOG.info("Runtype: {}", RunTypeHolder.get()); + if(RunTypeHolder.get().equals(RunTypeHolder.RunType.SECURE) --- End diff -- Yes == is more appropriate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023705 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeHolder.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +public class RunTypeHolder { + + private static RunType runType = RunType.CLEAR; --- End diff -- Will change it to PLAIN --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402535#comment-15402535 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73023705 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/RunTypeHolder.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +public class RunTypeHolder { + + private static RunType runType = RunType.CLEAR; --- End diff -- Will change it to PLAIN > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4251) Add possiblity for the RMQ Streaming Sink to customize the queue
[ https://issues.apache.org/jira/browse/FLINK-4251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402530#comment-15402530 ] ASF GitHub Bot commented on FLINK-4251: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2281 > Add possiblity for the RMQ Streaming Sink to customize the queue > > > Key: FLINK-4251 > URL: https://issues.apache.org/jira/browse/FLINK-4251 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Philipp Grulich >Priority: Minor > > This patch adds the possibilty for the user of the RabbitMQ > Streaming Sink to customize the queue which is used. > This adopts the behavior of [FLINK-4025] for the sink. > The commit doesn't change the actual behaviour but makes it > possible for users to override the `setupQueue` > method and customize their implementation. This was only possible for the > RMQSource before. The Sink and the Source offer now both the same > functionality, so this should increase usability. > [FLINK-4025] = https://issues.apache.org/jira/browse/FLINK-4025 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2281: RMQ Sink: Possibility to customize queue config [F...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2281 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73020806 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java --- @@ -60,18 +64,33 @@ private static Properties standardProps; private static ForkableFlinkMiniCluster flink; + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + @BeforeClass public static void prepare() throws IOException, ClassNotFoundException { LOG.info("-"); LOG.info("Starting KafkaShortRetentionTestBase "); LOG.info("-"); + Configuration flinkConfig = new Configuration(); + // dynamically load the implementation for the test Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + LOG.info("Runtype: {}", RunTypeHolder.get()); + if(RunTypeHolder.get().equals(RunTypeHolder.RunType.SECURE) + && kafkaServer.isSecureRunSupported()) { + SecureTestEnvironment.prepare(tempFolder); + SecureTestEnvironment.getSecurityEnabledFlinkConfiguration(flinkConfig); --- End diff -- No. The passed config object will be populated with keytab and principal security configurations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402485#comment-15402485 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73020806 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java --- @@ -60,18 +64,33 @@ private static Properties standardProps; private static ForkableFlinkMiniCluster flink; + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + @BeforeClass public static void prepare() throws IOException, ClassNotFoundException { LOG.info("-"); LOG.info("Starting KafkaShortRetentionTestBase "); LOG.info("-"); + Configuration flinkConfig = new Configuration(); + // dynamically load the implementation for the test Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + LOG.info("Runtype: {}", RunTypeHolder.get()); + if(RunTypeHolder.get().equals(RunTypeHolder.RunType.SECURE) + && kafkaServer.isSecureRunSupported()) { + SecureTestEnvironment.prepare(tempFolder); + SecureTestEnvironment.getSecurityEnabledFlinkConfiguration(flinkConfig); --- End diff -- No. The passed config object will be populated with keytab and principal security configurations. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402457#comment-15402457 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73017860 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.SecureTestEnvironment; +import org.apache.flink.test.util.TestingSecurityContext; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; --- End diff -- Sorry, this might be from IntelliJ auto-import stuff. Will revert to use appropriate imports. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73017860 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java --- @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.SecureTestEnvironment; +import org.apache.flink.test.util.TestingSecurityContext; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; --- End diff -- Sorry, this might be from IntelliJ auto-import stuff. Will revert to use appropriate imports. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4297) Yarn client can't determine fat jar location if path contains spaces
[ https://issues.apache.org/jira/browse/FLINK-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402439#comment-15402439 ] ASF GitHub Bot commented on FLINK-4297: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2320 It didn't fix it unfortunately. > Yarn client can't determine fat jar location if path contains spaces > > > Key: FLINK-4297 > URL: https://issues.apache.org/jira/browse/FLINK-4297 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.2.0 > > > The code that automatically determines the fat jar path through the > ProtectionDomain of the Yarn class, receives a possibly URL encoded path > string. We need to decode using the system locale encoding, otherwise we can > receive errors of the following when spaces are in the file path: > {noformat} > Caused by: java.io.FileNotFoundException: File > file:/Users/max/Downloads/release-testing/flink-1.1.0-rc1/flink-1.1.0/build%20target/lib/flink-dist_2.11-1.1.0.jar > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289) > at > org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:82) > at > org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1836) > at org.apache.flink.yarn.Utils.setupLocalResource(Utils.java:129) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:616) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:365) > ... 6 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402438#comment-15402438 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73015565 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +/* + * Process-wide security context object which initializes UGI with appropriate security credentials and also it + * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the + * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism. + */ +@Internal +public class SecurityContext { + + private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class); + + private static SecurityContext installedContext; + + public static SecurityContext getInstalled() { return installedContext; } + + private UserGroupInformation ugi; + + SecurityContext(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public T runSecured(final FlinkSecuredRunner runner) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return runner.run(); + } + }); + } + + public static void install(SecurityConfiguration config) throws Exception { + + // perform static initialization of UGI, JAAS + if(installedContext != null) { + LOG.warn("overriding previous security context"); + } + + // establish the JAAS config + JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); + javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + + //hack since Kafka Login Handler explicitly look for the property or else it throws an exception + //https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + System.setProperty("java.security.auth.login.config", ""); + + // establish the UGI login user + UserGroupInformation.setConfiguration(config.hadoopConf); + UserGroupInformation loginUser; + if(UserGroupInformation.isSecurityEnabled() && config.keytab != null && !Preconditions.isNullOrEmpty(config.principal)) { --- End diff -- Sure. Will handle it in JAASConfiguration.java as that looks more appropriate place. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a
[GitHub] flink issue #2320: [FLINK-4297][yarn] decode URL encoded fat jar path
Github user uce commented on the issue: https://github.com/apache/flink/pull/2320 It didn't fix it unfortunately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73015565 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +/* + * Process-wide security context object which initializes UGI with appropriate security credentials and also it + * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the + * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism. + */ +@Internal +public class SecurityContext { + + private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class); + + private static SecurityContext installedContext; + + public static SecurityContext getInstalled() { return installedContext; } + + private UserGroupInformation ugi; + + SecurityContext(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public T runSecured(final FlinkSecuredRunner runner) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return runner.run(); + } + }); + } + + public static void install(SecurityConfiguration config) throws Exception { + + // perform static initialization of UGI, JAAS + if(installedContext != null) { + LOG.warn("overriding previous security context"); + } + + // establish the JAAS config + JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); + javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + + //hack since Kafka Login Handler explicitly look for the property or else it throws an exception + //https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + System.setProperty("java.security.auth.login.config", ""); + + // establish the UGI login user + UserGroupInformation.setConfiguration(config.hadoopConf); + UserGroupInformation loginUser; + if(UserGroupInformation.isSecurityEnabled() && config.keytab != null && !Preconditions.isNullOrEmpty(config.principal)) { --- End diff -- Sure. Will handle it in JAASConfiguration.java as that looks more appropriate place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402428#comment-15402428 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73014755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +/* + * Process-wide security context object which initializes UGI with appropriate security credentials and also it + * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the + * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism. + */ +@Internal +public class SecurityContext { + + private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class); + + private static SecurityContext installedContext; + + public static SecurityContext getInstalled() { return installedContext; } + + private UserGroupInformation ugi; + + SecurityContext(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public T runSecured(final FlinkSecuredRunner runner) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return runner.run(); + } + }); + } + + public static void install(SecurityConfiguration config) throws Exception { + + // perform static initialization of UGI, JAAS + if(installedContext != null) { + LOG.warn("overriding previous security context"); + } + + // establish the JAAS config + JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); + javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + + //hack since Kafka Login Handler explicitly look for the property or else it throws an exception + //https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + System.setProperty("java.security.auth.login.config", ""); --- End diff -- It was never set earlier and not really essential. We are adding it only because Kafka code implementation (09 branch) explicitly looks for this. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73014755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +/* + * Process-wide security context object which initializes UGI with appropriate security credentials and also it + * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the + * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism. + */ +@Internal +public class SecurityContext { + + private static final Logger LOG = LoggerFactory.getLogger(SecurityContext.class); + + private static SecurityContext installedContext; + + public static SecurityContext getInstalled() { return installedContext; } + + private UserGroupInformation ugi; + + SecurityContext(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public T runSecured(final FlinkSecuredRunner runner) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return runner.run(); + } + }); + } + + public static void install(SecurityConfiguration config) throws Exception { + + // perform static initialization of UGI, JAAS + if(installedContext != null) { + LOG.warn("overriding previous security context"); + } + + // establish the JAAS config + JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); + javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + + //hack since Kafka Login Handler explicitly look for the property or else it throws an exception + //https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + System.setProperty("java.security.auth.login.config", ""); --- End diff -- It was never set earlier and not really essential. We are adding it only because Kafka code implementation (09 branch) explicitly looks for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402414#comment-15402414 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73013795 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { --- End diff -- I had this as package local earlier but the test framework code is making use of this class (extending) and hence I changed it to public constructor. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402416#comment-15402416 ] ASF GitHub Bot commented on FLINK-4299: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2322 > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.2.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2322: [FLINK-4299] show loss of job manager in Client
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2322 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4299. -- Resolution: Fixed Fix Version/s: 1.2.0 Fixed in 4d988a9 (release-1.1), 7ea9c01 (master). > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.2.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73013795 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { --- End diff -- I had this as package local earlier but the test framework code is making use of this class (extending) and hence I changed it to public constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402412#comment-15402412 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73013657 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { + keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); + keytabKerberosOptions.put("credsType", "both"); + } else { + keytabKerberosOptions.put("keyTab", keytab); + keytabKerberosOptions.put("doNotPrompt", "true"); + keytabKerberosOptions.put("useKeyTab", "true"); + keytabKerberosOptions.put("storeKey", "true"); + } + + keytabKerberosOptions.put("principal", principal); + keytabKerberosOptions.put("refreshKrb5Config", "true"); + keytabKerberosOptions.putAll(debugOptions); + + keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + keytabKerberosOptions); + } + } + + private static final MapdebugOptions = new HashMap<>(); --- End diff -- Yes, it makes sense to use log4j
[jira] [Closed] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4296. -- Resolution: Fixed Fixed in fa3a3fc (release-1.1), 10642f7 (master). > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(6/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(2/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(2/11) > switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73013657 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { + keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); + keytabKerberosOptions.put("credsType", "both"); + } else { + keytabKerberosOptions.put("keyTab", keytab); + keytabKerberosOptions.put("doNotPrompt", "true"); + keytabKerberosOptions.put("useKeyTab", "true"); + keytabKerberosOptions.put("storeKey", "true"); + } + + keytabKerberosOptions.put("principal", principal); + keytabKerberosOptions.put("refreshKrb5Config", "true"); + keytabKerberosOptions.putAll(debugOptions); + + keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + keytabKerberosOptions); + } + } + + private static final MapdebugOptions = new HashMap<>(); --- End diff -- Yes, it makes sense to use log4j configuration to toggle JAAS debug logging option --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is
[jira] [Commented] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402408#comment-15402408 ] ASF GitHub Bot commented on FLINK-4296: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2321 > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(6/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(2/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at
[GitHub] flink pull request #2321: [FLINK-4296] Fixes failure reporting of consumer t...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2321 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4297) Yarn client can't determine fat jar location if path contains spaces
[ https://issues.apache.org/jira/browse/FLINK-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402402#comment-15402402 ] ASF GitHub Bot commented on FLINK-4297: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2320 Thanks for trying it out @uce! There was an unrelated quoting issue in `yarn-session.sh`. I pushed another fix. > Yarn client can't determine fat jar location if path contains spaces > > > Key: FLINK-4297 > URL: https://issues.apache.org/jira/browse/FLINK-4297 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.2.0 > > > The code that automatically determines the fat jar path through the > ProtectionDomain of the Yarn class, receives a possibly URL encoded path > string. We need to decode using the system locale encoding, otherwise we can > receive errors of the following when spaces are in the file path: > {noformat} > Caused by: java.io.FileNotFoundException: File > file:/Users/max/Downloads/release-testing/flink-1.1.0-rc1/flink-1.1.0/build%20target/lib/flink-dist_2.11-1.1.0.jar > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289) > at > org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:82) > at > org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1836) > at org.apache.flink.yarn.Utils.setupLocalResource(Utils.java:129) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:616) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:365) > ... 6 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2320: [FLINK-4297][yarn] decode URL encoded fat jar path
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2320 Thanks for trying it out @uce! There was an unrelated quoting issue in `yarn-session.sh`. I pushed another fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { + keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); + keytabKerberosOptions.put("credsType", "both"); + } else { + keytabKerberosOptions.put("keyTab", keytab); + keytabKerberosOptions.put("doNotPrompt", "true"); + keytabKerberosOptions.put("useKeyTab", "true"); + keytabKerberosOptions.put("storeKey", "true"); + } + + keytabKerberosOptions.put("principal", principal); + keytabKerberosOptions.put("refreshKrb5Config", "true"); + keytabKerberosOptions.putAll(debugOptions); + + keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + keytabKerberosOptions); + } + } + + private static final MapdebugOptions = new HashMap<>(); + + private static final Map kerberosCacheOptions = new HashMap<>(); + + private static final Map keytabKerberosOptions = new HashMap<>(); + + private static final AppConfigurationEntry userKerberosAce; + + private AppConfigurationEntry
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402398#comment-15402398 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { + keytabKerberosOptions.put("useKeytab", prependFileUri(keytab)); + keytabKerberosOptions.put("credsType", "both"); + } else { + keytabKerberosOptions.put("keyTab", keytab); + keytabKerberosOptions.put("doNotPrompt", "true"); + keytabKerberosOptions.put("useKeyTab", "true"); + keytabKerberosOptions.put("storeKey", "true"); + } + + keytabKerberosOptions.put("principal", principal); + keytabKerberosOptions.put("refreshKrb5Config", "true"); + keytabKerberosOptions.putAll(debugOptions); + + keytabKerberosAce = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + keytabKerberosOptions); + } + } + + private static final MapdebugOptions = new HashMap<>(); + + private static final Map
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402395#comment-15402395 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { --- End diff -- I have tested with OpenJDK (1.8.0_91) only. Since the code fragment is derived from Hadoop UGI implementation, I assume it will work for IBM based JVM:) > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +/** + * + * JAAS configuration provider object that provides default LoginModule for various connectors that supports + * JAAS/SASL based Kerberos authentication. The implementation is inspired from Hadoop UGI class. + * + * Different connectors uses different login module name to implement JAAS based authentication support. + * For example, Kafka expects the login module name to be "kafkaClient" whereas ZooKeeper expect the + * name to be "client". This sets onus on the Flink cluster administrator to configure/provide right + * JAAS config entries. To simplify this requirement, we have introduced this abstraction that provides + * a standard lookup to get the login module entry for the JAAS based authentication to work. + * + * HDFS connector will not be impacted with this configuration since it uses UGI based mechanism to authenticate. + * + * https://docs.oracle.com/javase/7/docs/api/javax/security/auth/login/Configuration.html;>Configuration + * + */ + +@Internal +public class JaasConfiguration extends Configuration { + + private static final Logger LOG = LoggerFactory.getLogger(JaasConfiguration.class); + + public static final String JAVA_VENDOR_NAME = System.getProperty("java.vendor"); + + public static final boolean IBM_JAVA; + + static { + IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM"); + } + + public JaasConfiguration(String keytab, String principal) { + + LOG.info("Initializing JAAS configuration instance. Parameters: {}, {}", keytab, principal); + + if(!Preconditions.isNullOrEmpty(keytab) && !Preconditions.isNullOrEmpty(principal)) { + + if(IBM_JAVA) { --- End diff -- I have tested with OpenJDK (1.8.0_91) only. Since the code fragment is derived from Hadoop UGI implementation, I assume it will work for IBM based JVM:) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402387#comment-15402387 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012055 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -1016,6 +1016,23 @@ /** The environment variable name which contains the location of the lib folder */ public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR"; + // Security --- + + /** +* The config parameter defining security credentials required +* for securing Flink cluster. +*/ + + /** Keytab file key name to be used in flink configuration file */ + public static final String SECURITY_KEYTAB_KEY = "security.keytab"; + + /** Kerberos security principal key name to be used in flink configuration file */ + public static final String SECURITY_PRINCIPAL_KEY = "security.principal"; + + /** Keytab file name populated in YARN container */ + public static final String KEYTAB_FILE_NAME = "krb5.keytab"; --- End diff -- It is just a constant literal and not exposed in the Flink configuration file. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2275#discussion_r73012055 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -1016,6 +1016,23 @@ /** The environment variable name which contains the location of the lib folder */ public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR"; + // Security --- + + /** +* The config parameter defining security credentials required +* for securing Flink cluster. +*/ + + /** Keytab file key name to be used in flink configuration file */ + public static final String SECURITY_KEYTAB_KEY = "security.keytab"; + + /** Kerberos security principal key name to be used in flink configuration file */ + public static final String SECURITY_PRINCIPAL_KEY = "security.principal"; + + /** Keytab file name populated in YARN container */ + public static final String KEYTAB_FILE_NAME = "krb5.keytab"; --- End diff -- It is just a constant literal and not exposed in the Flink configuration file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402376#comment-15402376 ] ASF GitHub Bot commented on FLINK-4299: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2322 Yes, sir! :-) +1 to merge. > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2322: [FLINK-4299] show loss of job manager in Client
Github user uce commented on the issue: https://github.com/apache/flink/pull/2322 Yes, sir! :-) +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4303) Add CEP examples
Timo Walther created FLINK-4303: --- Summary: Add CEP examples Key: FLINK-4303 URL: https://issues.apache.org/jira/browse/FLINK-4303 Project: Flink Issue Type: Improvement Components: CEP Reporter: Timo Walther Neither CEP Java nor CEP Scala contain a runnable example. The example on the website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4297) Yarn client can't determine fat jar location if path contains spaces
[ https://issues.apache.org/jira/browse/FLINK-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402373#comment-15402373 ] ASF GitHub Bot commented on FLINK-4297: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2320 Tested this with Hadoop 2.4.1 and it works as expected with `bin/flink run -m yarn-cluster`: ``` 2016-08-01 16:32:06,214 INFO org.apache.flink.yarn.Utils - Copying from 2016-08-01 16:32:06,214 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/mxm/flink/flink-dist/target/flink-1.1-SNAPSHOT-bin/flink with spaces/conf/log4j.properties to gs://.../user/hadoop/.flink/application_1470041666362_0007/log4j.properties ... 08/01/2016 16:33:24 Job execution switched to status FINISHED. ``` But YARN sessions don't work. I get: ``` $ bin/yarn-session.sh Error: Could not find or load main class with ``` > Yarn client can't determine fat jar location if path contains spaces > > > Key: FLINK-4297 > URL: https://issues.apache.org/jira/browse/FLINK-4297 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.2.0 > > > The code that automatically determines the fat jar path through the > ProtectionDomain of the Yarn class, receives a possibly URL encoded path > string. We need to decode using the system locale encoding, otherwise we can > receive errors of the following when spaces are in the file path: > {noformat} > Caused by: java.io.FileNotFoundException: File > file:/Users/max/Downloads/release-testing/flink-1.1.0-rc1/flink-1.1.0/build%20target/lib/flink-dist_2.11-1.1.0.jar > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337) > at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289) > at > org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:82) > at > org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1836) > at org.apache.flink.yarn.Utils.setupLocalResource(Utils.java:129) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:616) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:365) > ... 6 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2320: [FLINK-4297][yarn] decode URL encoded fat jar path
Github user uce commented on the issue: https://github.com/apache/flink/pull/2320 Tested this with Hadoop 2.4.1 and it works as expected with `bin/flink run -m yarn-cluster`: ``` 2016-08-01 16:32:06,214 INFO org.apache.flink.yarn.Utils - Copying from 2016-08-01 16:32:06,214 INFO org.apache.flink.yarn.Utils - Copying from file:/home/hadoop/mxm/flink/flink-dist/target/flink-1.1-SNAPSHOT-bin/flink with spaces/conf/log4j.properties to gs://.../user/hadoop/.flink/application_1470041666362_0007/log4j.properties ... 08/01/2016 16:33:24 Job execution switched to status FINISHED. ``` But YARN sessions don't work. I get: ``` $ bin/yarn-session.sh Error: Could not find or load main class with ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402370#comment-15402370 ] ASF GitHub Bot commented on FLINK-4296: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2321 +1 LGTM > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(6/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(2/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at
[GitHub] flink issue #2321: [FLINK-4296] Fixes failure reporting of consumer task sch...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2321 +1 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners
[ https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402367#comment-15402367 ] Aljoscha Krettek commented on FLINK-4282: - Also, when you have day windows you need to offset because otherwise the days wouldn't align with the actual start of a day in your timezone. > Add Offset Parameter to WindowAssigners > --- > > Key: FLINK-4282 > URL: https://issues.apache.org/jira/browse/FLINK-4282 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Currently, windows are always aligned to EPOCH, which basically means days > are aligned with GMT. This is somewhat problematic for people living in > different timezones. > And offset parameter would allow to adapt the window assigner to the timezone. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4302) Add JavaDocs to MetricConfig
[ https://issues.apache.org/jira/browse/FLINK-4302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402363#comment-15402363 ] Chesnay Schepler commented on FLINK-4302: - that's a good idea. We could also think about renaming the class to MetricProperties; should make it clearer to people what kind of object they are dealing with. > Add JavaDocs to MetricConfig > > > Key: FLINK-4302 > URL: https://issues.apache.org/jira/browse/FLINK-4302 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Ufuk Celebi > > {{MetricConfig}} has no comments at all. If you want to implement a custom > reporter and you want to implement its {{open}} method, a {{MetricConfig}} is > its argument. It will be helpful to add one class-level JavaDoc stating where > the config values are coming from etc. > [~Zentol] what do you think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402348#comment-15402348 ] ASF GitHub Bot commented on FLINK-4299: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2322 [FLINK-4299] show loss of job manager in Client This prints a message when the leading JobManager changes after first connecting to a JobManager. Further, it prints a message when a connection to a JobManager has been established. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2322 commit bcbbe52c0db15d30518c7e4d868950612d083dbf Author: Maximilian MichelsDate: 2016-08-01T16:15:56Z [FLINK-4299] show loss of job manager in Client This prints a message when the leading JobManager changes after first connecting to a JobManager. Further, it prints a message when a connection to a JobManager has been established. > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2322: [FLINK-4299] show loss of job manager in Client
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2322 @uce Is this has you had in mind? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2322: [FLINK-4299] show loss of job manager in Client
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2322 [FLINK-4299] show loss of job manager in Client This prints a message when the leading JobManager changes after first connecting to a JobManager. Further, it prints a message when a connection to a JobManager has been established. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2322 commit bcbbe52c0db15d30518c7e4d868950612d083dbf Author: Maximilian MichelsDate: 2016-08-01T16:15:56Z [FLINK-4299] show loss of job manager in Client This prints a message when the leading JobManager changes after first connecting to a JobManager. Further, it prints a message when a connection to a JobManager has been established. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402349#comment-15402349 ] ASF GitHub Bot commented on FLINK-4299: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2322 @uce Is this has you had in mind? > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4289) Source files have executable flag set
[ https://issues.apache.org/jira/browse/FLINK-4289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402338#comment-15402338 ] Ufuk Celebi commented on FLINK-4289: Fixed in d13b825 (release-1.1). > Source files have executable flag set > - > > Key: FLINK-4289 > URL: https://issues.apache.org/jira/browse/FLINK-4289 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi >Priority: Minor > Fix For: 1.2.0 > > > Running {{find . -executable -type f}} lists the following source {{.java}} > files as executable: > {code} > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java > ./flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java > ./flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java > ./flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java > ./flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java > ./flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners
[ https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402339#comment-15402339 ] Aljoscha Krettek commented on FLINK-4282: - For most timezones this is valid but there are some that have hour + 15 min, 30 min or 45 min differences to UTC 0: https://en.wikipedia.org/wiki/List_of_UTC_time_offsets For now, it sounds far fetched but for those cases you would also need to have the offset so that hours align correctly. If we need the offset to do "hourly, but starting at quarter past" then we also solve this case, however. > Add Offset Parameter to WindowAssigners > --- > > Key: FLINK-4282 > URL: https://issues.apache.org/jira/browse/FLINK-4282 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Currently, windows are always aligned to EPOCH, which basically means days > are aligned with GMT. This is somewhat problematic for people living in > different timezones. > And offset parameter would allow to adapt the window assigner to the timezone. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4293) Malformatted Apache Headers
[ https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-4293: Summary: Malformatted Apache Headers (was: Malformatted Apache Haders) > Malformatted Apache Headers > --- > > Key: FLINK-4293 > URL: https://issues.apache.org/jira/browse/FLINK-4293 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Minor > > Several files contain this header: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} > The correct header format should be: > {code} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402334#comment-15402334 ] ASF GitHub Bot commented on FLINK-4296: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2321 Good catch. The change and test look good to me! This was broken for a long time (since the initial refactoring of the network stack I think). It never surfaced, because most use cases and tests run with pipelined results. > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(6/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to DEPLOYING > 08/01/2016 12:12:12 Join(Join at main(EnumTriangles.java:114))(1/11) > switched to SCHEDULED > 08/01/2016 12:12:12 Join(Join at
[GitHub] flink issue #2321: [FLINK-4296] Fixes failure reporting of consumer task sch...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2321 Good catch. The change and test look good to me! This was broken for a long time (since the initial refactoring of the network stack I think). It never surfaced, because most use cases and tests run with pipelined results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4283) ExecutionGraphRestartTest fails
[ https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402332#comment-15402332 ] Till Rohrmann commented on FLINK-4283: -- I tried to reproduce this issue locally but I was not successful. Could you maybe attach the logs for these test cases with log level DEBUG to the issue [~Zentol]. Maybe this helps to figure out the problem you're encountering. > ExecutionGraphRestartTest fails > --- > > Key: FLINK-4283 > URL: https://issues.apache.org/jira/browse/FLINK-4283 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 > Environment: Ubuntu 14.04 > W10 >Reporter: Chesnay Schepler > Labels: test-stability > > I encounter reliable failures for the following tests: > testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.089 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155) > taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 2.055 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180) > testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) > Time elapsed: 120.079 sec <<< FAILURE! > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402308#comment-15402308 ] ASF GitHub Bot commented on FLINK-4296: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2321 [FLINK-4296] Fixes failure reporting of consumer task scheduling when producer has already finished This PR changes the failure behaviour such that the consumer task is failed instead of the producer task. The latter is problematic, since a finsihed producer task will simply swallow scheduling exception originating from scheduling the consumer task. This PR should also be merged in the release-1.1.0 branch. R @mxm. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixBatchScheduling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2321 commit 6eaddb1d562117124e17d244aca69cd591bc9c54 Author: Till RohrmannDate: 2016-08-01T16:05:14Z [FLINK-4296] Fixes failure reporting of consumer task scheduling when producer has already finished This PR changes the failure behaviour such that the consumer task is failed instead of the producer task. The latter is problematic, since a finsihed producer task will simply swallow scheduling exception originating from scheduling the consumer task. > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016
[GitHub] flink pull request #2321: [FLINK-4296] Fixes failure reporting of consumer t...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2321 [FLINK-4296] Fixes failure reporting of consumer task scheduling when producer has already finished This PR changes the failure behaviour such that the consumer task is failed instead of the producer task. The latter is problematic, since a finsihed producer task will simply swallow scheduling exception originating from scheduling the consumer task. This PR should also be merged in the release-1.1.0 branch. R @mxm. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixBatchScheduling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2321 commit 6eaddb1d562117124e17d244aca69cd591bc9c54 Author: Till RohrmannDate: 2016-08-01T16:05:14Z [FLINK-4296] Fixes failure reporting of consumer task scheduling when producer has already finished This PR changes the failure behaviour such that the consumer task is failed instead of the producer task. The latter is problematic, since a finsihed producer task will simply swallow scheduling exception originating from scheduling the consumer task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402305#comment-15402305 ] Maximilian Michels commented on FLINK-4299: --- Good idea! We should add some messages in the {{JobClientActor}} class upon receiving a new leader or disconnecting from an existing. > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi > Fix For: 1.1.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402291#comment-15402291 ] Stephan Ewen commented on FLINK-4280: - I like the latest suggestion for configuration, with a few possible changes: - Can we keep the Kafka08 and Kafka09 configuration style similar? - How about calling offsets in ZK/Kafka the "Group Offsets", rather than "External Offsets". Not sure everyone gets the distinction between internal offsets (Flink) and external ones (Kafka/ZK) One question I still have is whether it ever makes sense to commit to a different group than to start from? > New Flink-specific option to set starting position of Kafka consumer without > respecting external offsets in ZK / Broker > --- > > Key: FLINK-4280 > URL: https://issues.apache.org/jira/browse/FLINK-4280 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Currently, to start reading from the "earliest" and "latest" position in > topics for the Flink Kafka consumer, users set the Kafka config > {{auto.offset.reset}} in the provided properties configuration. > However, the way this config actually works might be a bit misleading if > users were trying to find a way to "read topics from a starting position". > The way the {{auto.offset.reset}} config works in the Flink Kafka consumer > resembles Kafka's original intent for the setting: first, existing external > offsets committed to the ZK / brokers will be checked; if none exists, then > will {{auto.offset.reset}} be respected. > I propose to add Flink-specific ways to define the starting position, without > taking into account the external offsets. The original behaviour (reference > external offsets first) can be changed to be a user option, so that the > behaviour can be retained for frequent Kafka users that may need some > collaboration with existing non-Flink Kafka consumer applications. > How users will interact with the Flink Kafka consumer after this is added, > with a newly introduced {{flink.starting-position}} config: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "earliest/latest"); > props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a > warning) > props.setProperty("group.id", "...") // this won't have effect on the > starting position anymore (may still be used in external offset committing) > ... > {code} > Or, reference external offsets in ZK / broker: > {code} > Properties props = new Properties(); > props.setProperty("flink.starting-position", "external-offsets"); > props.setProperty("auto.offset.reset", "earliest/latest"); // default will be > latest > props.setProperty("group.id", "..."); // will be used to lookup external > offsets in ZK / broker on startup > ... > {code} > A thing we would need to decide on is what would the default value be for > {{flink.starting-position}}. > Two merits I see in adding this: > 1. This compensates the way users generally interpret "read from a starting > position". As the Flink Kafka connector is somewhat essentially a > "high-level" Kafka consumer for Flink users, I think it is reasonable to add > Flink-specific functionality that users will find useful, although it wasn't > supported in Kafka's original consumer designs. > 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is > used only to expose progress to the outside world, and not used to manipulate > how Kafka topics are read in Flink (unless users opt to do so)" is even more > definite and solid. There was some discussion in this PR > (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I > think adding this "decouples" more Flink's internal offset checkpointing from > the external Kafka's offset store. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4299) Show loss of job manager in Client
[ https://issues.apache.org/jira/browse/FLINK-4299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-4299: - Assignee: Maximilian Michels > Show loss of job manager in Client > -- > > Key: FLINK-4299 > URL: https://issues.apache.org/jira/browse/FLINK-4299 > Project: Flink > Issue Type: Improvement > Components: Client >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > If the client looses the connection to a job manager and the job recovers > from this, the client will only print the job status as {{RUNNING}} again. It > is hard to actually notice that something went wrong and a job manager was > lost. > {code} > ... > 08/01/2016 14:35:43 Flat Map -> Sink: Unnamed(8/8) switched to RUNNING > 08/01/2016 14:35:43 Source: Custom Source(6/8) switched to RUNNING > <-- EVERYTHING'S RUNNING --> > 08/01/2016 14:40:40 Job execution switched to status RUNNING <--- JOB > MANAGER FAIL OVER > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to SCHEDULED > 08/01/2016 14:40:40 Source: Custom Source(1/8) switched to DEPLOYING > 08/01/2016 14:40:40 Source: Custom Source(2/8) switched to SCHEDULED > ... > {code} > After {{14:35:43}} everything is running and the client does not print any > execution state updates. When the job manager fails, the job will be > recovered and enter the running state again eventually (at 14:40:40), but the > user might never notice this. > I would like to improve on this by printing some messages about the state of > the job manager connection. For example, between {{14:35:43}} and > {{14:40:40}} it might say that the job manager connection was lost, a new one > established, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4296) Scheduler accepts more tasks than it has task slots available
[ https://issues.apache.org/jira/browse/FLINK-4296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402303#comment-15402303 ] Till Rohrmann commented on FLINK-4296: -- I found the issue. The problem is that a scheduling failure of a consumer task in {{Execution.scheduleOrUpdateConsumer}} fails the current {{Execution}} and not the consuming {{Execution}}. If the current {{Execution}} is already in a final state (e.g. blocking data exchange), then the failure is simply ignored. I propose to fail the consuming {{Execution}} instead of the producing {{Execution}} in order to fail the job. > Scheduler accepts more tasks than it has task slots available > - > > Key: FLINK-4296 > URL: https://issues.apache.org/jira/browse/FLINK-4296 > Project: Flink > Issue Type: Bug > Components: JobManager, TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.1.0, 1.2.0 > > > Flink's scheduler doesn't support queued scheduling but expects to find all > necessary task slots upon scheduling. If it does not it throws an error. Due > to some changes in the latest master, this seems to be broken. > Flink accepts jobs with {{parallelism > total number of task slots}}, > schedules and deploys tasks in all available task slots, and leaves the > remaining tasks lingering forever. > Easy to reproduce: > {code} > ./bin/flink run -p TASK_SLOTS+n > {code} > where {{TASK_SLOTS}} is the number of total task slots of the cluster and > {{n>=1}}. > Here, {{p=11}}, {{TASK_SLOTS=10}}: > {{bin/flink run -p 11 examples/batch/EnumTriangles.jar}} > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of program > Executing EnumTriangles example with default edges data set. > Use --edges to specify file input. > Printing result to stdout. Use --output to specify output path. > Submitting job with JobID: cd0c0b4cbe25643d8d92558168cfc045. Waiting for job > completion. > 08/01/2016 12:12:12 Job execution switched to status RUNNING. > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to SCHEDULED > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to DEPLOYING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to RUNNING > 08/01/2016 12:12:12 CHAIN DataSource (at > getDefaultEdgeDataSet(EnumTrianglesData.java:57) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at > main(EnumTriangles.java:108))(1/1) switched to FINISHED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(2/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(7/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(6/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(4/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(3/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to SCHEDULED > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(9/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(5/11) switched to DEPLOYING > 08/01/2016 12:12:12 GroupReduce (GroupReduce at > main(EnumTriangles.java:112))(1/11) switched to DEPLOYING > 08/01/2016 12:12:12
[jira] [Commented] (FLINK-4282) Add Offset Parameter to WindowAssigners
[ https://issues.apache.org/jira/browse/FLINK-4282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402301#comment-15402301 ] Stephan Ewen commented on FLINK-4282: - Seems the offset can serve two purposes: 1. Compensate for timezones. This should be only relevant, though, if the event's timestamps are assigned based on UTC / EPOCH, but the window result should be not. Should we actually provide a "target timezone" for that, rather than an offset? 2. To create windows like "hourly, but starting at quarter past". Here, an offset would make sense (smaller than the window size, though) > Add Offset Parameter to WindowAssigners > --- > > Key: FLINK-4282 > URL: https://issues.apache.org/jira/browse/FLINK-4282 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Currently, windows are always aligned to EPOCH, which basically means days > are aligned with GMT. This is somewhat problematic for people living in > different timezones. > And offset parameter would allow to adapt the window assigner to the timezone. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4154) Correction of murmur hash breaks backwards compatibility
[ https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402300#comment-15402300 ] Ufuk Celebi commented on FLINK-4154: Reverted in 0274f9c (release-1.1) and 0f92a6b (master). > Correction of murmur hash breaks backwards compatibility > > > Key: FLINK-4154 > URL: https://issues.apache.org/jira/browse/FLINK-4154 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Greg Hogan >Priority: Blocker > Fix For: 1.1.0 > > > The correction of Flink's murmur hash with commit [1], breaks Flink's > backwards compatibility with respect to savepoints. The reason is that the > changed murmur hash which is used to partition elements in a {{KeyedStream}} > changes the mapping from keys to sub tasks. This changes the assigned key > spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns > states with a different key space to the sub tasks. > I think that this must be fixed for the upcoming 1.1 release. I see two > options to solve the problem: > - revert the changes, but then we don't know how the flawed murmur hash > performs > - develop tooling to repartition state of old savepoints. This is probably > not trivial since a keyed stream can also contain non-partitioned state which > is not partitionable in all cases. And even if only partitioned state is > used, we would need some kind of special operator which can repartition the > state wrt the key. > I think that the latter option requires some more thoughts and is thus > unlikely to be done before the release 1.1. Therefore, as a workaround, I > think that we should revert the murmur hash changes. > [1] > https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2237) Add hash-based Aggregation
[ https://issues.apache.org/jira/browse/FLINK-2237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402281#comment-15402281 ] Stephan Ewen commented on FLINK-2237: - It it is just about the fact the it is currently called {{CombineHint}}, then let's change the name. It is still in "beta" status, so should be changeable. I would not make the API more confusing just because that hint is currently called like it is. > Add hash-based Aggregation > -- > > Key: FLINK-2237 > URL: https://issues.apache.org/jira/browse/FLINK-2237 > Project: Flink > Issue Type: New Feature >Reporter: Rafiullah Momand >Assignee: Gabor Gevay >Priority: Minor > > Aggregation functions at the moment are implemented in a sort-based way. > How can we implement hash based Aggregation for Flink? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4302) Add JavaDocs to MetricConfig
Ufuk Celebi created FLINK-4302: -- Summary: Add JavaDocs to MetricConfig Key: FLINK-4302 URL: https://issues.apache.org/jira/browse/FLINK-4302 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.1.0 Reporter: Ufuk Celebi {{MetricConfig}} has no comments at all. If you want to implement a custom reporter and you want to implement its {{open}} method, a {{MetricConfig}} is its argument. It will be helpful to add one class-level JavaDoc stating where the config values are coming from etc. [~Zentol] what do you think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4301) Parameterize Flink version in Quickstart bash script
[ https://issues.apache.org/jira/browse/FLINK-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4301: Description: The Flink version is hard coded in the quickstart script (for Scala and Java). Thus, even if a user is in the Flink 1.0 docs the scripts are producing a quickstart of the Flink 1.1 release. It would be better if the one-liner in the documentation would contain the version such that a Flink 1.0 quickstart is built in the 1.0 documentation and 1.1 quickstart in the 1.1 documentation. (was: The Flink version is hard coded in the quickstart script (for Scala and Java). Thus, even if a user is in the Flink 1.0 docs the scripts are producing a quickstart of the Flink 1.1 release. It would be better if the one-liner in the documentation would contain the version such that a Flink 1.0 quickstart is build in the 1.0 documentation and 1.1 quickstart in the 1.1 documentation.) > Parameterize Flink version in Quickstart bash script > > > Key: FLINK-4301 > URL: https://issues.apache.org/jira/browse/FLINK-4301 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Timo Walther > Labels: starter > > The Flink version is hard coded in the quickstart script (for Scala and > Java). Thus, even if a user is in the Flink 1.0 docs the scripts are > producing a quickstart of the Flink 1.1 release. It would be better if the > one-liner in the documentation would contain the version such that a Flink > 1.0 quickstart is built in the 1.0 documentation and 1.1 quickstart in the > 1.1 documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4301) Parameterize Flink version in Quickstart bash script
[ https://issues.apache.org/jira/browse/FLINK-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4301: Labels: starter (was: ) > Parameterize Flink version in Quickstart bash script > > > Key: FLINK-4301 > URL: https://issues.apache.org/jira/browse/FLINK-4301 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Timo Walther > Labels: starter > > The Flink version is hard coded in the quickstart script (for Scala and > Java). Thus, even if a user is in the Flink 1.0 docs the scripts are > producing a quickstart of the Flink 1.1 release. It would be better if the > one-liner in the documentation would contain the version such that a Flink > 1.0 quickstart is build in the 1.0 documentation and 1.1 quickstart in the > 1.1 documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4301) Parameterize Flink version in Quickstart bash script
Timo Walther created FLINK-4301: --- Summary: Parameterize Flink version in Quickstart bash script Key: FLINK-4301 URL: https://issues.apache.org/jira/browse/FLINK-4301 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Timo Walther The Flink version is hard coded in the quickstart script (for Scala and Java). Thus, even if a user is in the Flink 1.0 docs the scripts are producing a quickstart of the Flink 1.1 release. It would be better if the one-liner in the documentation would contain the version such that a Flink 1.0 quickstart is build in the 1.0 documentation and 1.1 quickstart in the 1.1 documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4290) CassandraConnectorTest deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402243#comment-15402243 ] Stephan Ewen commented on FLINK-4290: - Updating the test assumption to skip the Cassandra tests on all Java 7 builds. > CassandraConnectorTest deadlocks > > > Key: FLINK-4290 > URL: https://issues.apache.org/jira/browse/FLINK-4290 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Critical > Attachments: jstack.txt > > > The {{CassandraConnectorTest}} encountered a full deadlock on my latest test > run. > Stack dump of the JVM is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work
[ https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402231#comment-15402231 ] Maximilian Michels commented on FLINK-4094: --- Thanks for elaborating on your thoughts [~ram_krish]! I was assuming that the memory segments actually would be managed by a pool regardless of preallocation set to false or true. You're right that without preallocation, segments are requested and released as they are needed. This DOES NOT work for the offheap memory segments (at least not how it is implemented now). Two possible fixes that come to my mind: 1) Implement proper clearing of the offheap memory segments and keeping the preallocation:false behavior: http://stackoverflow.com/a/8462690/2225100 2) Use pooling even with preallocation:false which only differs to preallocation:true that the memory is allocated lazily over time. I would say, let's go with 2) which should be easily solvable. {quote} I would rather say that it is better we do internal management of offheap buffers. We should create a pool from which the buffers are allocated and if the pool is of fixed size and we have requests for more buffers than the size of the pool we should allocate them onheap only. (if that is acceptable). {quote} In the case of offheap managed memory, the upper bound of the memory should be respected in the same way as for heap memory. No additional memory should be allocated on the heap instead. The user has to increase the memory size of the job fails. > Off heap memory deallocation might not properly work > > > Key: FLINK-4094 > URL: https://issues.apache.org/jira/browse/FLINK-4094 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.1.0 > > > A user reported that off-heap memory is not properly deallocated when setting > {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause > the TaskManager process being killed by the OS. > It should be possible to execute multiple batch jobs with preallocation > turned off. No longer used direct memory buffers should be properly garbage > collected so that the JVM process does not exceed it's maximum memory bounds. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4300) Improve error message for different Scala versions of JM and Client
Timo Walther created FLINK-4300: --- Summary: Improve error message for different Scala versions of JM and Client Key: FLINK-4300 URL: https://issues.apache.org/jira/browse/FLINK-4300 Project: Flink Issue Type: Improvement Components: Client, JobManager Reporter: Timo Walther If a user runs a job (e.g. via RemoteEnvironment) with different Scala versions of JobManager and Client, the job is not executed and has no proper error message. The Client fails only with a meaningless warning: {code} 16:59:58,677 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@127.0.0.1:6123] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. {code} JobManager log only contains the following warning: {code} 2016-08-01 16:59:58,664 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@192.168.1.142:63372] has failed, address is now gated for [5000] ms. Reason is: [scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114498752079829388]. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4290) CassandraConnectorTest deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402204#comment-15402204 ] Stephan Ewen commented on FLINK-4290: - Seems Cassandra does not work with Java 7, actually. We should include an assumption in the test. > CassandraConnectorTest deadlocks > > > Key: FLINK-4290 > URL: https://issues.apache.org/jira/browse/FLINK-4290 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Critical > Attachments: jstack.txt > > > The {{CassandraConnectorTest}} encountered a full deadlock on my latest test > run. > Stack dump of the JVM is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4290) CassandraConnectorTest deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402201#comment-15402201 ] Stephan Ewen commented on FLINK-4290: - Sorry, correcting myself. The command line was Java 7, I changed it to Java 8, now it works > CassandraConnectorTest deadlocks > > > Key: FLINK-4290 > URL: https://issues.apache.org/jira/browse/FLINK-4290 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Critical > Attachments: jstack.txt > > > The {{CassandraConnectorTest}} encountered a full deadlock on my latest test > run. > Stack dump of the JVM is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4290) CassandraConnectorTest deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402179#comment-15402179 ] Stephan Ewen commented on FLINK-4290: - My environment is Ubuntu 14.4, Maven 3.0.5, Java 8 > CassandraConnectorTest deadlocks > > > Key: FLINK-4290 > URL: https://issues.apache.org/jira/browse/FLINK-4290 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Critical > Attachments: jstack.txt > > > The {{CassandraConnectorTest}} encountered a full deadlock on my latest test > run. > Stack dump of the JVM is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2294: [FLINK-4265] [dataset api] Add a NoOpOperator
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2294 This would replace `Delegate` and is much simpler. I placed it in the Java API package but it would not be visible to users through the `DataSet` API. The ultimate goal is to implicitly reuse operations performing the same computation on the same input `DataSet`. The optimizer cannot do this without understanding the UDF configuration and logic. Two instances of a UDF may be * incompatible, in which case a new result must be computed * the same, in which case the old result can be reused * different but compatible, in which case the UDF can merge the configurations and the new, shared result replaces the old result Replacing the old result requires a wrapper. Using `ProxyFactory` in `Delegate` has limitations as documented in FLINK-4257. With a `NoOpOperator` we can perform the same function by appending and then ignoring a dummy POJO operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4290) CassandraConnectorTest deadlocks
[ https://issues.apache.org/jira/browse/FLINK-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402172#comment-15402172 ] Stephan Ewen commented on FLINK-4290: - Running it with maven (command line), I get {{java.lang.NoClassDefFoundError: Could not initialize class com.sun.jna.Native}} Running it in IntelliJ, it passes normally. > CassandraConnectorTest deadlocks > > > Key: FLINK-4290 > URL: https://issues.apache.org/jira/browse/FLINK-4290 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Priority: Critical > Attachments: jstack.txt > > > The {{CassandraConnectorTest}} encountered a full deadlock on my latest test > run. > Stack dump of the JVM is attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4265) Add a NoOpOperator
[ https://issues.apache.org/jira/browse/FLINK-4265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15402174#comment-15402174 ] ASF GitHub Bot commented on FLINK-4265: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2294 This would replace `Delegate` and is much simpler. I placed it in the Java API package but it would not be visible to users through the `DataSet` API. The ultimate goal is to implicitly reuse operations performing the same computation on the same input `DataSet`. The optimizer cannot do this without understanding the UDF configuration and logic. Two instances of a UDF may be * incompatible, in which case a new result must be computed * the same, in which case the old result can be reused * different but compatible, in which case the UDF can merge the configurations and the new, shared result replaces the old result Replacing the old result requires a wrapper. Using `ProxyFactory` in `Delegate` has limitations as documented in FLINK-4257. With a `NoOpOperator` we can perform the same function by appending and then ignoring a dummy POJO operator. > Add a NoOpOperator > -- > > Key: FLINK-4265 > URL: https://issues.apache.org/jira/browse/FLINK-4265 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > One recent feature of Gelly is algorithms which detect duplicated or similar > computation which can be shared. My initial implementation could only reuse a > {{DataSet}} result. Before committing to Flink this was updated to use a > javassist {{ProxyFactory}} allowing configuration to be merged and results to > be replaced. There were some issues, as identified in FLINK-4257. With a > {{NoOpOperator}} we can remove the use of {{ProxyFactory}} and resolve the > identified issues. > This ticket adds a {{NoOpOperator}} which is unwound in > {{OperatorTranslation.translate}}. The {{NoOpOperator}} contains a > {{DataSet}} which is accessed by a getter and setter. -- This message was sent by Atlassian JIRA (v6.3.4#6332)