[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804049#comment-15804049 ] ramkrishna.s.vasudevan commented on FLINK-2168: --- [~fhueske] Can I work on this now? I can see that FLINK-3848 is done. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Wilmer DAZA >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2977: [FLINK-5084] Replace Java Table API integration te...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2977#discussion_r94921927 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsValidationTest.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.junit._ + +class SetOperatorsValidationTest { --- End diff -- This class should be moved into the `validation` package. Can be done before merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5084) Replace Java Table API integration tests by unit tests
[ https://issues.apache.org/jira/browse/FLINK-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804130#comment-15804130 ] ASF GitHub Bot commented on FLINK-5084: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2977#discussion_r94921927 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsValidationTest.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, ValidationException} +import org.junit._ + +class SetOperatorsValidationTest { --- End diff -- This class should be moved into the `validation` package. Can be done before merging > Replace Java Table API integration tests by unit tests > -- > > Key: FLINK-5084 > URL: https://issues.apache.org/jira/browse/FLINK-5084 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > The Java Table API is a wrapper on top of the Scala Table API. > Instead of operating directly with Expressions like the Scala API, the Java > API accepts a String parameter which is parsed into Expressions. > We could therefore replace the Java Table API ITCases by tests that check > that the parsing step produces a valid logical plan. > This could be done by creating two {{Table}} objects for an identical query > once with the Scala Expression API and one with the Java String API and > comparing the logical plans of both {{Table}} objects. Basically something > like the following: > {code} > val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, > 'g, 'h) > val joinT1 = ds1.join(ds2).where('b === 'e).select('c, 'g) > val joinT2 = ds1.join(ds2).where("b = e").select("c, g") > val lPlan1 = joinT1.logicalPlan > val lPlan2 = joinT2.logicalPlan > Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4148) incorrect calculation distance in QuadTree
[ https://issues.apache.org/jira/browse/FLINK-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804136#comment-15804136 ] ASF GitHub Bot commented on FLINK-4148: --- Github user Fokko commented on the issue: https://github.com/apache/flink/pull/2442 Looks good, please merge. Should have been fixed long ago :-) > incorrect calculation distance in QuadTree > -- > > Key: FLINK-4148 > URL: https://issues.apache.org/jira/browse/FLINK-4148 > Project: Flink > Issue Type: Bug >Reporter: Alexey Diomin >Priority: Trivial > Attachments: > 0001-FLINK-4148-incorrect-calculation-minDist-distance-in.patch > > > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala#L105 > Because EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric we > always move in first case and never reach case for math.sqrt(minDist) > correct match first EuclideanDistanceMetric and after it > SquaredEuclideanDistanceMetric > p.s. because EuclideanDistanceMetric more compute expensive and stay as > default DistanceMetric it's can cause some performance degradation for KNN on > default parameters -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
[ https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5397. --- > Fail to deserialize savepoints in v1.1 when there exist missing fields in > class serialization descriptors > - > > Key: FLINK-5397 > URL: https://issues.apache.org/jira/browse/FLINK-5397 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Stefan Richter > Fix For: 1.2.0, 1.3.0 > > > To restore from the savepoints in previous versions, Flink now keeps all > classes whose serialization is changed and put them in a separated package > ("migration"). > When deserializing the old savepoints, flink will look up correct descriptors > ({{ObjectStreamClass}}) for these classes, without using those ones written > in serialized data. The implementation however is problematic when there > exist missing field descriptors in the serialized data. > When serializing an object, Java will only write the descriptors of those > non-null fields. But when we look up class descriptors with given classes, > all fields will be put into the descriptors. As a result, we will deserialize > the savepoints with incorrect descriptors, leading to serialization > exceptions. > A simple resolution is to update the name of read descriptors using > Reflections, without using different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3051: [FLINK-5399] Add more information to checkpoint re...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3051 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4890) FileInputFormatTest#testExcludeFiles fails on Windows OS
[ https://issues.apache.org/jira/browse/FLINK-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4890. --- > FileInputFormatTest#testExcludeFiles fails on Windows OS > > > Key: FLINK-4890 > URL: https://issues.apache.org/jira/browse/FLINK-4890 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.1.3 > Environment: Windows 10 >Reporter: Chesnay Schepler >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.3.0 > > > Running the mentioned test leads to an exception: > {code} > Illegal char <:> at index 2: > /C:/dev/cygwin64/tmp/junit3838395086498044255/another_file.bin > java.nio.file.InvalidPathException: Illegal char <:> at index 2: > /C:/dev/cygwin64/tmp/junit3838395086498044255/anot > >her_file.bin > at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182) > at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153) > at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77) > at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94) > at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255) > at java.nio.file.Paths.get(Paths.java:84) > at > org.apache.flink.api.common.io.GlobFilePathFilter.filterPath(GlobFilePathFilter.java:95) > at > org.apache.flink.api.common.io.FileInputFormat.acceptFile(FileInputFormat.java:644) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:600) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:476) > at > org.apache.flink.api.common.io.FileInputFormatTest.testReadMultiplePatterns(FileInputFormatTest.java:362) > {code} > The problem is that we are given a flink Path, which is then converted to a > String and given to the nio FIleSystem. The passed path is thus /C:/..., > which nio can't work with. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess
[ https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804162#comment-15804162 ] ASF GitHub Bot commented on FLINK-5399: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3051 > Add more information to checkpoint result of TriggerSavepointSuccess > > > Key: FLINK-5399 > URL: https://issues.apache.org/jira/browse/FLINK-5399 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: shijinkui > > Add checkpointId and triggerTime to TriggerSavepointSuccess > We can record the history of trigger checkpoint out of Flink System. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4890) FileInputFormatTest#testExcludeFiles fails on Windows OS
[ https://issues.apache.org/jira/browse/FLINK-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4890. - Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in - 1.2.0 via fb48c3b4cbc5a186cb7b812c8d05833c5852b385 - 1.3.0 via 15bd1f128f3d2ae1cad68e89355f9c5045b8e830 > FileInputFormatTest#testExcludeFiles fails on Windows OS > > > Key: FLINK-4890 > URL: https://issues.apache.org/jira/browse/FLINK-4890 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats >Affects Versions: 1.1.3 > Environment: Windows 10 >Reporter: Chesnay Schepler >Assignee: Stephan Ewen > Fix For: 1.2.0, 1.3.0 > > > Running the mentioned test leads to an exception: > {code} > Illegal char <:> at index 2: > /C:/dev/cygwin64/tmp/junit3838395086498044255/another_file.bin > java.nio.file.InvalidPathException: Illegal char <:> at index 2: > /C:/dev/cygwin64/tmp/junit3838395086498044255/anot > >her_file.bin > at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182) > at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153) > at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77) > at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94) > at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255) > at java.nio.file.Paths.get(Paths.java:84) > at > org.apache.flink.api.common.io.GlobFilePathFilter.filterPath(GlobFilePathFilter.java:95) > at > org.apache.flink.api.common.io.FileInputFormat.acceptFile(FileInputFormat.java:644) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:600) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:476) > at > org.apache.flink.api.common.io.FileInputFormatTest.testReadMultiplePatterns(FileInputFormatTest.java:362) > {code} > The problem is that we are given a flink Path, which is then converted to a > String and given to the nio FIleSystem. The passed path is thus /C:/..., > which nio can't work with. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3053: [FLINK-5400] Add accessor to folding states in Run...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3053 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5400) Add accessor to folding states in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804161#comment-15804161 ] ASF GitHub Bot commented on FLINK-5400: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3053 > Add accessor to folding states in RuntimeContext > > > Key: FLINK-5400 > URL: https://issues.apache.org/jira/browse/FLINK-5400 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now {{RuntimeContext}} does not provide the accessors to folding states. > Therefore users cannot use folding states in their rich functions. I think we > should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors
[ https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5397. - Resolution: Fixed Fix Version/s: 1.3.0 1.2.0 Fixed in - 1.2.0 via 3554c96d118a411906a22b1f1087de073617a4c7 - 1.3.0 via 09614cc82486b2682ba08876b47019cc604574ed > Fail to deserialize savepoints in v1.1 when there exist missing fields in > class serialization descriptors > - > > Key: FLINK-5397 > URL: https://issues.apache.org/jira/browse/FLINK-5397 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Stefan Richter > Fix For: 1.2.0, 1.3.0 > > > To restore from the savepoints in previous versions, Flink now keeps all > classes whose serialization is changed and put them in a separated package > ("migration"). > When deserializing the old savepoints, flink will look up correct descriptors > ({{ObjectStreamClass}}) for these classes, without using those ones written > in serialized data. The implementation however is problematic when there > exist missing field descriptors in the serialized data. > When serializing an object, Java will only write the descriptors of those > non-null fields. But when we look up class descriptors with given classes, > all fields will be put into the descriptors. As a result, we will deserialize > the savepoints with incorrect descriptors, leading to serialization > exceptions. > A simple resolution is to update the name of read descriptors using > Reflections, without using different descriptors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5390) input should be closed in finally block in YarnFlinkApplicationMasterRunner#loadJobGraph()
[ https://issues.apache.org/jira/browse/FLINK-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5390. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed via 9f7ad84abf7f1c33c4ee40be1eb0297a28a30f57 Thank you for the contribution! > input should be closed in finally block in > YarnFlinkApplicationMasterRunner#loadJobGraph() > -- > > Key: FLINK-5390 > URL: https://issues.apache.org/jira/browse/FLINK-5390 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Roman Maier >Priority: Minor > Fix For: 1.3.0 > > > {code} > FileInputStream input = new FileInputStream(fp); > ObjectInputStream obInput = new ObjectInputStream(input); > jg = (JobGraph) obInput.readObject(); > input.close(); > {code} > If readObject() throws exception, input would be left unclosed. > Similar issue is in AbstractYarnClusterDescriptor#startAppMaster() around > line 726. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5390) input should be closed in finally block in YarnFlinkApplicationMasterRunner#loadJobGraph()
[ https://issues.apache.org/jira/browse/FLINK-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5390. --- > input should be closed in finally block in > YarnFlinkApplicationMasterRunner#loadJobGraph() > -- > > Key: FLINK-5390 > URL: https://issues.apache.org/jira/browse/FLINK-5390 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Roman Maier >Priority: Minor > Fix For: 1.3.0 > > > {code} > FileInputStream input = new FileInputStream(fp); > ObjectInputStream obInput = new ObjectInputStream(input); > jg = (JobGraph) obInput.readObject(); > input.close(); > {code} > If readObject() throws exception, input would be left unclosed. > Similar issue is in AbstractYarnClusterDescriptor#startAppMaster() around > line 726. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess
[ https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5399. --- > Add more information to checkpoint result of TriggerSavepointSuccess > > > Key: FLINK-5399 > URL: https://issues.apache.org/jira/browse/FLINK-5399 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: shijinkui >Assignee: shijinkui > Fix For: 1.3.0 > > > Add checkpointId and triggerTime to TriggerSavepointSuccess > We can record the history of trigger checkpoint out of Flink System. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5400) Add accessor to folding states in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5400. --- > Add accessor to folding states in RuntimeContext > > > Key: FLINK-5400 > URL: https://issues.apache.org/jira/browse/FLINK-5400 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Now {{RuntimeContext}} does not provide the accessors to folding states. > Therefore users cannot use folding states in their rich functions. I think we > should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5400) Add accessor to folding states in RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5400. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed in d63f831a4b11bb927a8cc216b4901d9262e44053 Thank you for the contribution! > Add accessor to folding states in RuntimeContext > > > Key: FLINK-5400 > URL: https://issues.apache.org/jira/browse/FLINK-5400 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Now {{RuntimeContext}} does not provide the accessors to folding states. > Therefore users cannot use folding states in their rich functions. I think we > should provide the missing accessor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804080#comment-15804080 ] Fabian Hueske commented on FLINK-2168: -- Hi [~ram_krish], yes. That would be great. We should wait for FLINK-5280 as well, but this issue is very close to be resolved. I'll assign the issue to you. > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Wilmer DAZA >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2168) Add HBaseTableSource
[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-2168: - Assignee: ramkrishna.s.vasudevan (was: Wilmer DAZA) > Add HBaseTableSource > > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: ramkrishna.s.vasudevan >Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804100#comment-15804100 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 I just had a crazy thought. :-) What do you think about moving `getFieldNames()` and `getFieldIndicies()` into a separate trait / interface maybe `DefinedFieldNames`? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 I just had a crazy thought. :-) What do you think about moving `getFieldNames()` and `getFieldIndicies()` into a separate trait / interface maybe `DefinedFieldNames`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think I got the idea :) Could you elaborate on it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804108#comment-15804108 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think I got the idea :) Could you elaborate on it? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3052: Swap the pattern matching order
Github user Fokko closed the pull request at: https://github.com/apache/flink/pull/3052 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3052: Swap the pattern matching order
Github user Fokko commented on the issue: https://github.com/apache/flink/pull/3052 Alright, just rebased with master. Looks like that the Travis is working again, good job! Cheers! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2977: [FLINK-5084] Replace Java Table API integration tests by ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2977 Thanks for the update and rebasing @mtunique! PR is good to merge (one file should be moved, see comment above). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5084) Replace Java Table API integration tests by unit tests
[ https://issues.apache.org/jira/browse/FLINK-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804138#comment-15804138 ] ASF GitHub Bot commented on FLINK-5084: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2977 Thanks for the update and rebasing @mtunique! PR is good to merge (one file should be moved, see comment above). > Replace Java Table API integration tests by unit tests > -- > > Key: FLINK-5084 > URL: https://issues.apache.org/jira/browse/FLINK-5084 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > The Java Table API is a wrapper on top of the Scala Table API. > Instead of operating directly with Expressions like the Scala API, the Java > API accepts a String parameter which is parsed into Expressions. > We could therefore replace the Java Table API ITCases by tests that check > that the parsing step produces a valid logical plan. > This could be done by creating two {{Table}} objects for an identical query > once with the Scala Expression API and one with the Java String API and > comparing the logical plans of both {{Table}} objects. Basically something > like the following: > {code} > val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, > 'g, 'h) > val joinT1 = ds1.join(ds2).where('b === 'e).select('c, 'g) > val joinT2 = ds1.join(ds2).where("b = e").select("c, g") > val lPlan1 = joinT1.logicalPlan > val lPlan2 = joinT2.logicalPlan > Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2442: [FLINK-4148] incorrect calculation minDist distance in Qu...
Github user Fokko commented on the issue: https://github.com/apache/flink/pull/2442 Looks good, please merge. Should have been fixed long ago :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804149#comment-15804149 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 we remove the methods from `TableSource` and add them to an interface. If a table source does not implement the methods, we use the names provided by the `TypeInformation`. If the table source implements the methods, we use those names. The distinction is done in `TableSourceTable` What do you think? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 we remove the methods from `TableSource` and add them to an interface. If a table source does not implement the methods, we use the names provided by the `TypeInformation`. If the table source implements the methods, we use those names. The distinction is done in `TableSourceTable` What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess
[ https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5399. - Resolution: Implemented Assignee: shijinkui Fix Version/s: 1.3.0 Implemented in d156f8d73cdd152da924f1923a615374258d5015 Thank you for the contribution! > Add more information to checkpoint result of TriggerSavepointSuccess > > > Key: FLINK-5399 > URL: https://issues.apache.org/jira/browse/FLINK-5399 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: shijinkui >Assignee: shijinkui > Fix For: 1.3.0 > > > Add checkpointId and triggerTime to TriggerSavepointSuccess > We can record the history of trigger checkpoint out of Flink System. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3048: Clarified the import path of the Breeze DenseVecto...
Github user Fokko closed the pull request at: https://github.com/apache/flink/pull/3048 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes
[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804274#comment-15804274 ] Timo Walther commented on FLINK-5319: - Would this change break binary backwards compatibility? I'm not sure if the community would accept this change only because of a Java bug that might be solved in future releases. > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Alexander Chermenin >Assignee: Timo Walther > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource bStream = environment.fromElements(bs); > DataStreamSource cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector) A::getId).print(); > cStream.keyBy((KeySelector) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at >
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think we will win a lot with this. Even if we remove these two methods from the `TableSource` trait interface there is still `getTypeIndices` method and Java users will have to call it if they are going to implement a `TableSource` trait. And if a user knows how to inherit a trait with one method he/she will be able to inherit a trait with three methods. The second problem with this approach is that it's not really Object-Oriented. We will have to rely on reflection tricks (probably sugared with pattern matching) while we simply having three methods is more clean OO solution. What if we leave all three methods and simply add some base Java implementations that already implement these traits? Something like `JavaBatchTableSource`, `JavaStreamTableSource`, and `JavaBatchStreamTableSource`? Then users will not need to struggle with the traits inheritance issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Which `getTypeIndicies` methods are you referring to? `TableSource` does only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move the latter two to a separate interface, only `getReturnType` is left. Also I think this is typical OO design. We do not need reflection to check if an object implements an interface. That's a very common operation in Java and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is sufficient to check whether the table source implements the interface or not. Isn't this a good compromise of having a lean interface (also simple for Java users) and at the same time the possibility to override field names if necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804341#comment-15804341 ] ASF GitHub Bot commented on FLINK-5280: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I like this idea. In this way, we only need to provide `BatchTableSource` and `StreamTableSource` interfaces, not involving the odd `BatchStreamTableSource`. We can keep the interface very clean. If I understand right, all the concrete implementation of `TableSource` will not implement `DefinesFieldNames` for now ? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 @mushketyk no worries :-) @wuchong Since the methods are only added when needed by implementing the interface there is no default implementation. The logic of the default implementation (calling the static method of `TableEnvironment`) is directly put into the `TableSourceTable` and only replaced if the table source implements the new interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804349#comment-15804349 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 @mushketyk no worries :-) @wuchong Since the methods are only added when needed by implementing the interface there is no default implementation. The logic of the default implementation (calling the static method of `TableEnvironment`) is directly put into the `TableSourceTable` and only replaced if the table source implements the new interface. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804281#comment-15804281 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 I don't think we will win a lot with this. Even if we remove these two methods from the `TableSource` trait interface there is still `getTypeIndices` method and Java users will have to call it if they are going to implement a `TableSource` trait. And if a user knows how to inherit a trait with one method he/she will be able to inherit a trait with three methods. The second problem with this approach is that it's not really Object-Oriented. We will have to rely on reflection tricks (probably sugared with pattern matching) while we simply having three methods is more clean OO solution. What if we leave all three methods and simply add some base Java implementations that already implement these traits? Something like `JavaBatchTableSource`, `JavaStreamTableSource`, and `JavaBatchStreamTableSource`? Then users will not need to struggle with the traits inheritance issues. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804286#comment-15804286 ] ASF GitHub Bot commented on FLINK-4692: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 I would like to shepherd this PR. > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2938: [FLINK-4692] [tableApi] Add tumbling group-windows for ba...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 I would like to shepherd this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5418) Estimated row size does not support nested types
Timo Walther created FLINK-5418: --- Summary: Estimated row size does not support nested types Key: FLINK-5418 URL: https://issues.apache.org/jira/browse/FLINK-5418 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Operations that use {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support nested types yet and fail with: {code} java.lang.AssertionError: Internal error: Error occurred while applying rule DataSetMinusRule at org.apache.calcite.util.Util.newInternal(Util.java:792) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256) at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288) at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40) at org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.TableException: Unsupported data
[jira] [Updated] (FLINK-5418) Estimated row size does not support nested types
[ https://issues.apache.org/jira/browse/FLINK-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-5418: Affects Version/s: 1.2.0 > Estimated row size does not support nested types > > > Key: FLINK-5418 > URL: https://issues.apache.org/jira/browse/FLINK-5418 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: Timo Walther > > Operations that use > {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support > nested types yet and fail with: > {code} > java.lang.AssertionError: Internal error: Error occurred while applying rule > DataSetMinusRule > at org.apache.calcite.util.Util.newInternal(Util.java:792) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288) > at > org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) > at > org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40) > at > org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804300#comment-15804300 ] ASF GitHub Bot commented on FLINK-5280: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3039 Which `getTypeIndicies` methods are you referring to? `TableSource` does only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move the latter two to a separate interface, only `getReturnType` is left. Also I think this is typical OO design. We do not need reflection to check if an object implements an interface. That's a very common operation in Java and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is sufficient to check whether the table source implements the interface or not. Isn't this a good compromise of having a lean interface (also simple for Java users) and at the same time the possibility to override field names if necessary? > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske Sorry, there are only two methods. Please ignore my comment :) I think you are right and this seems like a good approach. If @wuchong is on board with this I'll update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3037: Flink-4450 update storm version to 1.0
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3037#discussion_r9492 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java --- @@ -186,15 +186,15 @@ public void testCreateTopologyContext() { .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); - LocalCluster cluster = new LocalCluster(); - Config c = new Config(); - c.setNumAckers(0); - cluster.submitTopology("test", c, builder.createTopology()); - - while (TestSink.result.size() != 8) { - Utils.sleep(100); - } - cluster.shutdown(); +// LocalCluster cluster = new LocalCluster(); --- End diff -- Are these lines accidentally commented out, or is this not working with Storm 1.0 any more? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804333#comment-15804333 ] ASF GitHub Bot commented on FLINK-5280: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/3039 @fhueske Sorry, there are only two methods. Please ignore my comment :) I think you are right and this seems like a good approach. If @wuchong is on board with this I'll update the PR accordingly. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3039 I like this idea. In this way, we only need to provide `BatchTableSource` and `StreamTableSource` interfaces, not involving the odd `BatchStreamTableSource`. We can keep the interface very clean. If I understand right, all the concrete implementation of `TableSource` will not implement `DefinesFieldNames` for now ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5401) Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics
[ https://issues.apache.org/jira/browse/FLINK-5401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5401. --- > Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics > --- > > Key: FLINK-5401 > URL: https://issues.apache.org/jira/browse/FLINK-5401 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 > Environment: macOS >Reporter: Anton Solovev > > {code} > java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertTrue(Assert.java:52) > at > org.apache.flink.runtime.net.ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics(ConnectionUtilsTest.java:45) > {code} > in org.apache.flink.runtime.net.ConnectionUtilsTest while testing 1.1.3 RC2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5401) Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics
[ https://issues.apache.org/jira/browse/FLINK-5401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5401. - Resolution: Invalid Has already been solved in newer versions of Flink via 36c09b0996404eac47abfc3ef8387a5f353f9756 > Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics > --- > > Key: FLINK-5401 > URL: https://issues.apache.org/jira/browse/FLINK-5401 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 > Environment: macOS >Reporter: Anton Solovev > > {code} > java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertTrue(Assert.java:52) > at > org.apache.flink.runtime.net.ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics(ConnectionUtilsTest.java:45) > {code} > in org.apache.flink.runtime.net.ConnectionUtilsTest while testing 1.1.3 RC2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3071: [FLINK-5417][DOCUMENTATION]correct the wrong config file ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3071 you should be able to open the svg file with any text editor and add the license manually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5417) Fix the wrong config file name
[ https://issues.apache.org/jira/browse/FLINK-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804374#comment-15804374 ] ASF GitHub Bot commented on FLINK-5417: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3071 you should be able to open the svg file with any text editor and add the license manually. > Fix the wrong config file name > --- > > Key: FLINK-5417 > URL: https://issues.apache.org/jira/browse/FLINK-5417 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Wang >Priority: Trivial > > As the config file name is conf/flink-conf.yaml, the usage > "conf/flink-config.yaml" in document is wrong and easy to confuse user. We > should correct them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3072: [Flink-5378] Update Scopt version to 3.5.0
GitHub user LorenzBuehmann opened a pull request: https://github.com/apache/flink/pull/3072 [Flink-5378] Update Scopt version to 3.5.0 This will also allow for using comma-separated values in the CLI. Note, as per https://github.com/scopt/scopt/releases/tag/v3.5.0 Scopt 3.5.0 introduces two-column rendering for the usage text, which is enabled by default. From my point of view this is even better as it'S more compact and better to read. You can merge this pull request into a Git repository by running: $ git pull https://github.com/LorenzBuehmann/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3072.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3072 commit 85aa6079fe610fe4cce066da90edcc7cf87a9704 Author: Lorenz BuehmannDate: 2017-01-06T12:34:13Z [Flink-5378] Bumped Scopt version to 3.5.0. This will also allow for using comma-separated values in the CLI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3070: [FLINK-5119][web-frontend] Fix problems in displaying TM ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3070 Changes look good, adding them to my next batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3069: [FLINK-5381][web-frontend] Fix scrolling issues
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3069 Changes look good, adding them to my next batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)
[ https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804502#comment-15804502 ] ASF GitHub Bot commented on FLINK-5381: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3069 Changes look good, adding them to my next batch. > Scrolling in some web interface pages doesn't work (taskmanager details, > jobmanager config) > --- > > Key: FLINK-5381 > URL: https://issues.apache.org/jira/browse/FLINK-5381 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > It seems that scrolling in the web interface doesn't work anymore on some > pages in the 1.2 release branch. > Example pages: > - When you click the "JobManager" tab > - The TaskManager logs page -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend
[ https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804503#comment-15804503 ] ASF GitHub Bot commented on FLINK-5119: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3070 Changes look good, adding them to my next batch. > Last taskmanager heartbeat not showing in web frontend > -- > > Key: FLINK-5119 > URL: https://issues.apache.org/jira/browse/FLINK-5119 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.3 >Reporter: Ufuk Celebi > > The web frontend does not list anything for the last heartbeat in the web > frontend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5419) Taskmanager metrics not accessible via REST
Chesnay Schepler created FLINK-5419: --- Summary: Taskmanager metrics not accessible via REST Key: FLINK-5419 URL: https://issues.apache.org/jira/browse/FLINK-5419 Project: Flink Issue Type: Bug Components: Metrics, Webfrontend Affects Versions: 1.2.0, 1.3.0 Reporter: Chesnay Schepler Priority: Blocker Fix For: 1.2.0, 1.3.0 There is currently a URL clash between the TaskManagersHandler and TaskManagerMetricsHandler, with both being routed to {code} /taskmanagers/:taskmanagerid/metrics {code} As a result it is not possible to query the full set of metrics for a taskmanager, but only the hard-coded subset that is displayed on the metrics tab on the taskmanager page. This is a side-effect of 6d53bbc4b92e651786ecc8c2c6dfeb8e450a16a3 making the URL's more consistent. The TaskManager page in the web-interface has 3 tabs: Metrics, Log and Stdout. The URLs for these tabs are {code} /taskmanager//metrics /taskmanager//log /taskmanager//stdout {code} which correspond to the REST URL's used. Previously, the metrics tab used {code}/taskmanager/{code} However, 70704de0c82cbb7b143dd696221e11999feb3600 then exposed the metrics gathered by the metrics system through the REST API. The assumption was that general information for the taskmanagers are retrieved via /taskmanager/, similar to how the job-related URL's are structured, which sadly isn't the case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5418) Estimated row size does not support nested types
[ https://issues.apache.org/jira/browse/FLINK-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804506#comment-15804506 ] ASF GitHub Bot commented on FLINK-5418: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3073 [FLINK-5418] [table] Estimated row size does not support nested types This adds an estimate for array, composite and generic types. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-5418 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3073.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3073 commit 8472588f35d95c2d227a3387090d3a6e6ded343d Author: twalthrDate: 2017-01-06T12:59:38Z [FLINK-5418] [table] Estimated row size does not support nested types > Estimated row size does not support nested types > > > Key: FLINK-5418 > URL: https://issues.apache.org/jira/browse/FLINK-5418 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: Timo Walther > > Operations that use > {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support > nested types yet and fail with: > {code} > java.lang.AssertionError: Internal error: Error occurred while applying rule > DataSetMinusRule > at org.apache.calcite.util.Util.newInternal(Util.java:792) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225) > at > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256) > at > org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288) > at > org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140) > at > org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40) > at > org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at >
[GitHub] flink pull request #3073: [FLINK-5418] [table] Estimated row size does not s...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3073 [FLINK-5418] [table] Estimated row size does not support nested types This adds an estimate for array, composite and generic types. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-5418 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3073.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3073 commit 8472588f35d95c2d227a3387090d3a6e6ded343d Author: twalthrDate: 2017-01-06T12:59:38Z [FLINK-5418] [table] Estimated row size does not support nested types --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)
[ https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5381: Affects Version/s: 1.3.0 > Scrolling in some web interface pages doesn't work (taskmanager details, > jobmanager config) > --- > > Key: FLINK-5381 > URL: https://issues.apache.org/jira/browse/FLINK-5381 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger > Fix For: 1.2.0, 1.3.0 > > > It seems that scrolling in the web interface doesn't work anymore on some > pages in the 1.2 release branch. > Example pages: > - When you click the "JobManager" tab > - The TaskManager logs page -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)
[ https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5381: Assignee: Sachin Goel > Scrolling in some web interface pages doesn't work (taskmanager details, > jobmanager config) > --- > > Key: FLINK-5381 > URL: https://issues.apache.org/jira/browse/FLINK-5381 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > It seems that scrolling in the web interface doesn't work anymore on some > pages in the 1.2 release branch. > Example pages: > - When you click the "JobManager" tab > - The TaskManager logs page -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)
[ https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5381: Component/s: Webfrontend > Scrolling in some web interface pages doesn't work (taskmanager details, > jobmanager config) > --- > > Key: FLINK-5381 > URL: https://issues.apache.org/jira/browse/FLINK-5381 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > It seems that scrolling in the web interface doesn't work anymore on some > pages in the 1.2 release branch. > Example pages: > - When you click the "JobManager" tab > - The TaskManager logs page -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)
[ https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5381: Fix Version/s: 1.3.0 1.2.0 > Scrolling in some web interface pages doesn't work (taskmanager details, > jobmanager config) > --- > > Key: FLINK-5381 > URL: https://issues.apache.org/jira/browse/FLINK-5381 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > It seems that scrolling in the web interface doesn't work anymore on some > pages in the 1.2 release branch. > Example pages: > - When you click the "JobManager" tab > - The TaskManager logs page -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend
[ https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5119: Affects Version/s: 1.3.0 1.2.0 > Last taskmanager heartbeat not showing in web frontend > -- > > Key: FLINK-5119 > URL: https://issues.apache.org/jira/browse/FLINK-5119 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.1.3, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The web frontend does not list anything for the last heartbeat in the web > frontend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll
[ https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5359: Fix Version/s: 1.3.0 1.2.0 > Job Exceptions view doesn't scroll > --- > > Key: FLINK-5359 > URL: https://issues.apache.org/jira/browse/FLINK-5359 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Eron Wright >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The exception information is cut off because the details panel doesn't have a > scrollbar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll
[ https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5359: Assignee: Sachin Goel > Job Exceptions view doesn't scroll > --- > > Key: FLINK-5359 > URL: https://issues.apache.org/jira/browse/FLINK-5359 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Eron Wright >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The exception information is cut off because the details panel doesn't have a > scrollbar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable
[ https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5267: Affects Version/s: 1.3.0 1.2.0 > TaskManager logs not scrollable > --- > > Key: FLINK-5267 > URL: https://issues.apache.org/jira/browse/FLINK-5267 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 > Environment: DC/OS 1.8 >Reporter: Mischa Krüger > Fix For: 1.2.0, 1.3.0 > > > Latest master build, have run the quickstart example successfully and wanted > to see the TM logs, but they can't be scrolled. Download the log is luckily > still possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll
[ https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5359: Affects Version/s: 1.3.0 1.2.0 > Job Exceptions view doesn't scroll > --- > > Key: FLINK-5359 > URL: https://issues.apache.org/jira/browse/FLINK-5359 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Eron Wright >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The exception information is cut off because the details panel doesn't have a > scrollbar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend
[ https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5119: Fix Version/s: 1.3.0 1.2.0 > Last taskmanager heartbeat not showing in web frontend > -- > > Key: FLINK-5119 > URL: https://issues.apache.org/jira/browse/FLINK-5119 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.1.3, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The web frontend does not list anything for the last heartbeat in the web > frontend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend
[ https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5119: Assignee: Sachin Goel > Last taskmanager heartbeat not showing in web frontend > -- > > Key: FLINK-5119 > URL: https://issues.apache.org/jira/browse/FLINK-5119 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.1.3, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > The web frontend does not list anything for the last heartbeat in the web > frontend. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable
[ https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5267: Fix Version/s: 1.3.0 1.2.0 > TaskManager logs not scrollable > --- > > Key: FLINK-5267 > URL: https://issues.apache.org/jira/browse/FLINK-5267 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 > Environment: DC/OS 1.8 >Reporter: Mischa Krüger > Fix For: 1.2.0, 1.3.0 > > > Latest master build, have run the quickstart example successfully and wanted > to see the TM logs, but they can't be scrolled. Download the log is luckily > still possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable
[ https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5267: Assignee: Sachin Goel > TaskManager logs not scrollable > --- > > Key: FLINK-5267 > URL: https://issues.apache.org/jira/browse/FLINK-5267 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0, 1.3.0 > Environment: DC/OS 1.8 >Reporter: Mischa Krüger >Assignee: Sachin Goel > Fix For: 1.2.0, 1.3.0 > > > Latest master build, have run the quickstart example successfully and wanted > to see the TM logs, but they can't be scrolled. Download the log is luckily > still possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5368) Let Kafka consumer show something when it fails to read one topic out of topic list
[ https://issues.apache.org/jira/browse/FLINK-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804525#comment-15804525 ] ASF GitHub Bot commented on FLINK-5368: --- Github user HungUnicorn commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r94943539 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } } - if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } --- End diff -- To throw exception fits our case better than to log INFO message, but @rmetzger suggested that some cases might be fine and to log INFO is more general if I understood correctly. > Let Kafka consumer show something when it fails to read one topic out of > topic list > --- > > Key: FLINK-5368 > URL: https://issues.apache.org/jira/browse/FLINK-5368 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Sendoh >Assignee: Sendoh >Priority: Critical > > As a developer when reading data from many topics, I want Kafka consumer to > show something if any topic is not available. The motivation is we read many > topics as list at one time, and sometimes we fail to recognize that one or > two topics' names have been changed or deprecated, and Flink Kafka connector > doesn't show the error. > My proposed change would be either to throw RuntimeException or to use > LOG.error(topic + "doesn't have any partition") if partitionsForTopic is null > at this function. > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java#L208 > Any suggestion is welcome. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3036: [FLINK-5368] Log msg if kafka topic doesn't have a...
Github user HungUnicorn commented on a diff in the pull request: https://github.com/apache/flink/pull/3036#discussion_r94943539 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } } - if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } --- End diff -- To throw exception fits our case better than to log INFO message, but @rmetzger suggested that some cases might be fine and to log INFO is more general if I understood correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3065: [hotfix] [table] Enable all CalciteConfigBuilderTest test...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3065 Since the tests appear to pass on travis, +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805194#comment-15805194 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright thanks a lot for this fix, I am looking though it now. From the description in the JIRAs, I take that this adds the logic that reads custom JAAS security configurations and uses Flink's internal one (Hadoop UGI based) for fallback default? This should be relevant for the `master` and the 1.2 release branch, correct? > Rework JAAS configuration to support user-supplied entries > -- > > Key: FLINK-5364 > URL: https://issues.apache.org/jira/browse/FLINK-5364 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Labels: kerberos, security > > Recent issues (see linked) have brought to light a critical deficiency in the > handling of JAAS configuration. > 1. the MapR distribution relies on an explicit JAAS conf, rather than > in-memory conf used by stock Hadoop. > 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent > (one can enable each element separately) but isn't. > Perhaps we should rework the JAAS conf code to merge any user-supplied > configuration with our defaults, rather than using an all-or-nothing > approach. > We should also address some recent regressions: > 1. The HadoopSecurityContext should be installed regardless of auth mode, to > login with UserGroupInformation, which: > - handles the HADOOP_USER_NAME variable. > - installs an OS-specific user principal (from UnixLoginModule etc.) > unrelated to Kerberos. > - picks up the HDFS/HBASE delegation tokens. > 2. Fix the use of alternative authentication methods - delegation tokens and > Kerberos ticket cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3057: [FLINK-5364] Rework JAAS configuration to support user-su...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright thanks a lot for this fix, I am looking though it now. From the description in the JIRAs, I take that this adds the logic that reads custom JAAS security configurations and uses Flink's internal one (Hadoop UGI based) for fallback default? This should be relevant for the `master` and the 1.2 release branch, correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3059: [docs] Clarify restart strategy defaults set by checkpoin...
Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/3059 The changes so far are docs-only, so the CI failure must be unrelated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3746) WebRuntimeMonitorITCase.testNoCopyFromJar failing intermittently
[ https://issues.apache.org/jira/browse/FLINK-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boris Osipov closed FLINK-3746. --- Resolution: Fixed Was fixed with FLINK-4255 > WebRuntimeMonitorITCase.testNoCopyFromJar failing intermittently > > > Key: FLINK-3746 > URL: https://issues.apache.org/jira/browse/FLINK-3746 > Project: Flink > Issue Type: Bug >Reporter: Todd Lisonbee >Assignee: Boris Osipov >Priority: Minor > Labels: flaky-test > > Test failed randomly in Travis, > https://s3.amazonaws.com/archive.travis-ci.org/jobs/122624299/log.txt > Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.127 sec > <<< FAILURE! - in org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase > testNoCopyFromJar(org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase) > Time elapsed: 0.124 sec <<< FAILURE! > java.lang.AssertionError: expected:<200 OK> but was:<503 Service Unavailable> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase.testNoCopyFromJar(WebRuntimeMonitorITCase.java:456) > Results : > Failed tests: > WebRuntimeMonitorITCase.testNoCopyFromJar:456 expected:<200 OK> but > was:<503 Service Unavailable> -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5178) allow BlobCache touse a distributed file system irrespective of the HA mode
[ https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-5178: --- Description: After FLINK-5129, high availability (HA) mode adds the ability for the BlobCache instances at the task managers to download blobs directly from the distributed file system. It would be nice if this also worked in non-HA mode. (was: After FLINK-5129, high availability (HA) mode adds the ability for the BlobCache instances at the task managers to download blobs directly from the distributed file system. It would be nice if this also worked in non-HA mode and BLOB_STORAGE_DIRECTORY_KEY may point to a distributed file system.) Summary: allow BlobCache touse a distributed file system irrespective of the HA mode (was: allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed file system) > allow BlobCache touse a distributed file system irrespective of the HA mode > --- > > Key: FLINK-5178 > URL: https://issues.apache.org/jira/browse/FLINK-5178 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > After FLINK-5129, high availability (HA) mode adds the ability for the > BlobCache instances at the task managers to download blobs directly from the > distributed file system. It would be nice if this also worked in non-HA mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5178) allow BlobCache to use a distributed file system irrespective of the HA mode
[ https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-5178: --- Summary: allow BlobCache to use a distributed file system irrespective of the HA mode (was: allow BlobCache touse a distributed file system irrespective of the HA mode) > allow BlobCache to use a distributed file system irrespective of the HA mode > > > Key: FLINK-5178 > URL: https://issues.apache.org/jira/browse/FLINK-5178 > Project: Flink > Issue Type: Improvement > Components: Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > After FLINK-5129, high availability (HA) mode adds the ability for the > BlobCache instances at the task managers to download blobs directly from the > distributed file system. It would be nice if this also worked in non-HA mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805207#comment-15805207 ] ASF GitHub Bot commented on FLINK-5355: --- GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/3078 [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like: ``` com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue. I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink skidder/flink-5355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3078 commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d Author: Scott KidderDate: 2016-12-16T16:46:54Z [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at >
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/3078 [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like: ``` com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue. I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink skidder/flink-5355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3078.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3078 commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d Author: Scott KidderDate: 2016-12-16T16:46:54Z [FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5424) Improve Restart Strategy Logging
Shannon Carey created FLINK-5424: Summary: Improve Restart Strategy Logging Key: FLINK-5424 URL: https://issues.apache.org/jira/browse/FLINK-5424 Project: Flink Issue Type: Improvement Components: Core Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor I'll be submitting a PR which includes some minor improvements to logging related to restart strategies. Specifically, I added a toString so that the log contains better info about failure-rate restart strategy, and I added an explanation in the log when the restart strategy is responsible for preventing job restart (currently, there's no indication that the restart strategy had anything to do with it). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805618#comment-15805618 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright Thanks for explaining. It figured it out concurrently by looking at the updated documentation you wrote ;-) The docs are good, helped a lot! > Rework JAAS configuration to support user-supplied entries > -- > > Key: FLINK-5364 > URL: https://issues.apache.org/jira/browse/FLINK-5364 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Labels: kerberos, security > > Recent issues (see linked) have brought to light a critical deficiency in the > handling of JAAS configuration. > 1. the MapR distribution relies on an explicit JAAS conf, rather than > in-memory conf used by stock Hadoop. > 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent > (one can enable each element separately) but isn't. > Perhaps we should rework the JAAS conf code to merge any user-supplied > configuration with our defaults, rather than using an all-or-nothing > approach. > We should also address some recent regressions: > 1. The HadoopSecurityContext should be installed regardless of auth mode, to > login with UserGroupInformation, which: > - handles the HADOOP_USER_NAME variable. > - installs an OS-specific user principal (from UnixLoginModule etc.) > unrelated to Kerberos. > - picks up the HDFS/HBASE delegation tokens. > 2. Fix the use of alternative authentication methods - delegation tokens and > Kerberos ticket cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3057: [FLINK-5364] Rework JAAS configuration to support user-su...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3057 @EronWright Thanks for explaining. It figured it out concurrently by looking at the updated documentation you wrote ;-) The docs are good, helped a lot! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805600#comment-15805600 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95009232 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); + try { + for (Class moduleClass : config.getSecurityModules()) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805608#comment-15805608 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95010038 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Responsible for installing a process-wide JAAS configuration. + * + * The installed configuration combines login modules based on: + * - the user-supplied JAAS configuration file, if any + * - a Kerberos keytab, if configured + * - any cached Kerberos credentials from the current environment + * + * The module also installs a default JAAS config file (if necessary) for + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations. + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + */ +@Internal +public class JaasModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class); + + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf"; + + private String priorConfigFile; + private javax.security.auth.login.Configuration priorConfig; + + private DynamicConfiguration currentConfig; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + // ensure that a config file is always defined, for compatibility with + // ZK and Kafka which check for the system property and existence of the file + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null); + if (priorConfigFile == null) { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + } + + // read the JAAS configuration file + priorConfig = javax.security.auth.login.Configuration.getConfiguration(); + + // construct a dynamic JAAS configuration + currentConfig = new DynamicConfiguration(priorConfig); + + // wire up the configured JAAS login contexts to use the krb5 entries + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig); + if(krb5Entries != null) { + for (String app : securityConfig.getLoginContextNames()) { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + } + } + + javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + + @Override + public void uninstall() { + if(priorConfigFile != null) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805607#comment-15805607 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007014 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); --- End diff -- Can you use `new ArrayList<>()` here? In general, it would be nice to make a pass
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805601#comment-15805601 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r94997957 --- Diff: docs/internals/flink_security.md --- @@ -24,64 +24,109 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. ## Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. ## How Flink Security works -Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. -A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security -requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period -of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. +In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort, +Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication: -- Kafka (0.9) +- Kafka (0.9+) - HDFS +- HBase - ZooKeeper -Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation -(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context. +Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable +Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.The shared element is the configuration of +Kerbreros credentials, which is then explicitly used by each component. + +The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which +are installed at startup. The next section describes each security module. + +### Hadoop Security Module +This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is +then used for all interactions with Hadoop, including HDFS, HBase, and YARN. +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805604#comment-15805604 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95009675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Responsible for installing a Hadoop login user. + */ +public class HadoopModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class); + + UserGroupInformation loginUser; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration()); + + try { + if (UserGroupInformation.isSecurityEnabled() && + !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) { + String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath(); + + UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath); + + loginUser = UserGroupInformation.getLoginUser(); + + // supplement with any available tokens + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (fileLocation != null) { + /* +* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are +* used in the context of reading the stored tokens from UGI. +* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); +* loginUser.addCredentials(cred); + */ + try { + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + securityConfig.getHadoopConfiguration()); + Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", + Credentials.class); + addCredentialsMethod.invoke(loginUser, cred); + } catch (NoSuchMethodException e) { + LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); + } catch (InvocationTargetException e) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805603#comment-15805603 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); + try { + for (Class moduleClass : config.getSecurityModules()) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805605#comment-15805605 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95008182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); + try { + for (Class moduleClass : config.getSecurityModules()) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805606#comment-15805606 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95007601 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); + try { + for (Class moduleClass : config.getSecurityModules()) { +
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805609#comment-15805609 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95003470 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Array; + +import javax.annotation.Nullable; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + + +/** + * A dynamic JAAS configuration. + * + * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon + * an (optional) underlying configuration. Entries from the underlying configuration take + * precedence over dynamic entries. + */ +public class DynamicConfiguration extends Configuration { + + protected static final Logger LOG = LoggerFactory.getLogger(DynamicConfiguration.class); + + private final Configuration delegate; + + private final MapdynamicEntries = new HashMap<>(); + + /** +* Create a dynamic configuration. +* @param delegate an underlying configuration to delegate to, or null. + */ + public DynamicConfiguration(@Nullable Configuration delegate) { + this.delegate = delegate; + } + + /** +* Add entries for the given application name. + */ + public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) { + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + final AppConfigurationEntry[] updated; + if(existing == null) { + updated = Arrays.copyOf(entry, entry.length); + } + else { + updated = merge(existing, entry); + } + dynamicEntries.put(name, updated); + } + + /** +* Retrieve the AppConfigurationEntries for the specified name +* from this Configuration. +* +* +* +* @param name the name used to index the Configuration. +* +* @return an array of AppConfigurationEntries for the specified name +* from this Configuration, or null if there are no entries +* for the specified name +*/ + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + AppConfigurationEntry[] entry = null; + if(delegate != null) { + entry = delegate.getAppConfigurationEntry(name); + } + final AppConfigurationEntry[] existing = dynamicEntries.get(name); + if(existing != null) { + if(entry != null) { + entry = merge(entry, existing); + } + else { + entry = Arrays.copyOf(existing, existing.length); + } + } + return entry; + } + + private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, AppConfigurationEntry[] b) { + AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + b.length); + Array.copy(b, 0, merged, a.length, b.length); --- End diff -- Can we use `System.arrayCopy()` here to avoid a dependency on Scala in this part of the code? > Rework JAAS configuration to support user-supplied entries >
[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries
[ https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805602#comment-15805602 ] ASF GitHub Bot commented on FLINK-5364: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r94994079 --- Diff: docs/internals/flink_security.md --- @@ -24,64 +24,109 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. ## Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. --- End diff -- Maybe point out here that this refers to a "Flink Cluster" (a set of JobManager/TaskManager processes). One can run different jobs with different credentials next to each other in YARN by starting different per-job-clusters or Yarn/Mesos sessions. > Rework JAAS configuration to support user-supplied entries > -- > > Key: FLINK-5364 > URL: https://issues.apache.org/jira/browse/FLINK-5364 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Labels: kerberos, security > > Recent issues (see linked) have brought to light a critical deficiency in the > handling of JAAS configuration. > 1. the MapR distribution relies on an explicit JAAS conf, rather than > in-memory conf used by stock Hadoop. > 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent > (one can enable each element separately) but isn't. > Perhaps we should rework the JAAS conf code to merge any user-supplied > configuration with our defaults, rather than using an all-or-nothing > approach. > We should also address some recent regressions: > 1. The HadoopSecurityContext should be installed regardless of auth mode, to > login with UserGroupInformation, which: > - handles the HADOOP_USER_NAME variable. > - installs an OS-specific user principal (from UnixLoginModule etc.) > unrelated to Kerberos. > - picks up the HDFS/HBASE delegation tokens. > 2. Fix the use of alternative authentication methods - delegation tokens and > Kerberos ticket cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95010038 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Responsible for installing a process-wide JAAS configuration. + * + * The installed configuration combines login modules based on: + * - the user-supplied JAAS configuration file, if any + * - a Kerberos keytab, if configured + * - any cached Kerberos credentials from the current environment + * + * The module also installs a default JAAS config file (if necessary) for + * compatibility with ZK and Kafka. Note that the JRE actually draws on numerous file locations. + * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html + * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 + */ +@Internal +public class JaasModule implements SecurityModule { + + private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class); + + static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; + + static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf"; + + private String priorConfigFile; + private javax.security.auth.login.Configuration priorConfig; + + private DynamicConfiguration currentConfig; + + @Override + public void install(SecurityUtils.SecurityConfiguration securityConfig) { + + // ensure that a config file is always defined, for compatibility with + // ZK and Kafka which check for the system property and existence of the file + priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null); + if (priorConfigFile == null) { + File configFile = generateDefaultConfigFile(); + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath()); + } + + // read the JAAS configuration file + priorConfig = javax.security.auth.login.Configuration.getConfiguration(); + + // construct a dynamic JAAS configuration + currentConfig = new DynamicConfiguration(priorConfig); + + // wire up the configured JAAS login contexts to use the krb5 entries + AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig); + if(krb5Entries != null) { + for (String app : securityConfig.getLoginContextNames()) { + currentConfig.addAppConfigurationEntry(app, krb5Entries); + } + } + + javax.security.auth.login.Configuration.setConfiguration(currentConfig); + } + + @Override + public void uninstall() { + if(priorConfigFile != null) { + System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile); + } else { + System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG); + } + javax.security.auth.login.Configuration.setConfiguration(priorConfig);
[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r94997957 --- Diff: docs/internals/flink_security.md --- @@ -24,64 +24,109 @@ specific language governing permissions and limitations under the License. --> -This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN) -and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers -who plans to run Flink on a secure environment. +This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos), +filesystems, connectors, and state backends. ## Objective +The primary goals of the Flink Kerberos security infrastructure are: +1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka) +2. to authenticate to ZooKeeper (if configured to use SASL) +3. to authenticate to Hadoop components (e.g. HDFS, HBase) -The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario, -streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure -data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the -context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster. +In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure +data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token +or ticket cache entry. + +The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential +or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. ## How Flink Security works -Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. -A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security -requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period -of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security. +In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort, +Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication: -- Kafka (0.9) +- Kafka (0.9+) - HDFS +- HBase - ZooKeeper -Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation -(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context. +Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable +Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.The shared element is the configuration of +Kerbreros credentials, which is then explicitly used by each component. + +The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which +are installed at startup. The next section describes each security module. + +### Hadoop Security Module +This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is +then used for all interactions with Hadoop, including HDFS, HBase, and YARN. + +If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise, +the login user conveys only the user identity of the OS account that launched the cluster. +
[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95009232 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* -* Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are -* used in the context of reading the stored tokens from UGI. -* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); -* loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List modules = new ArrayList(); + try { + for (Class moduleClass : config.getSecurityModules()) { + SecurityModule module = moduleClass.newInstance(); + module.install(config); + modules.add(module); } + } +