[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-06 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804049#comment-15804049
 ] 

ramkrishna.s.vasudevan commented on FLINK-2168:
---

[~fhueske]
Can I work on this now? I can see that FLINK-3848 is done.

> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Wilmer DAZA
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2977: [FLINK-5084] Replace Java Table API integration te...

2017-01-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2977#discussion_r94921927
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsValidationTest.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SetOperatorsValidationTest {
--- End diff --

This class should be moved into the `validation` package.
Can be done before merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5084) Replace Java Table API integration tests by unit tests

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804130#comment-15804130
 ] 

ASF GitHub Bot commented on FLINK-5084:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2977#discussion_r94921927
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsValidationTest.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SetOperatorsValidationTest {
--- End diff --

This class should be moved into the `validation` package.
Can be done before merging


> Replace Java Table API integration tests by unit tests
> --
>
> Key: FLINK-5084
> URL: https://issues.apache.org/jira/browse/FLINK-5084
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> The Java Table API is a wrapper on top of the Scala Table API. 
> Instead of operating directly with Expressions like the Scala API, the Java 
> API accepts a String parameter which is parsed into Expressions.
> We could therefore replace the Java Table API ITCases by tests that check 
> that the parsing step produces a valid logical plan.
> This could be done by creating two {{Table}} objects for an identical query 
> once with the Scala Expression API and one with the Java String API and 
> comparing the logical plans of both {{Table}} objects. Basically something 
> like the following:
> {code}
> val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 
> 'g, 'h)
> val joinT1 = ds1.join(ds2).where('b === 'e).select('c, 'g)
> val joinT2 = ds1.join(ds2).where("b = e").select("c, g")
> val lPlan1 = joinT1.logicalPlan
> val lPlan2 = joinT2.logicalPlan
> Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4148) incorrect calculation distance in QuadTree

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804136#comment-15804136
 ] 

ASF GitHub Bot commented on FLINK-4148:
---

Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/2442
  
Looks good, please merge. Should have been fixed long ago :-)


> incorrect calculation distance in QuadTree
> --
>
> Key: FLINK-4148
> URL: https://issues.apache.org/jira/browse/FLINK-4148
> Project: Flink
>  Issue Type: Bug
>Reporter: Alexey Diomin
>Priority: Trivial
> Attachments: 
> 0001-FLINK-4148-incorrect-calculation-minDist-distance-in.patch
>
>
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/QuadTree.scala#L105
> Because EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric we 
> always move in first case and never reach case for math.sqrt(minDist)
> correct match first EuclideanDistanceMetric and after it 
> SquaredEuclideanDistanceMetric
> p.s. because EuclideanDistanceMetric more compute expensive and stay as 
> default DistanceMetric it's can cause some performance degradation for KNN on 
> default parameters



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5397.
---

> Fail to deserialize savepoints in v1.1 when there exist missing fields in 
> class serialization descriptors
> -
>
> Key: FLINK-5397
> URL: https://issues.apache.org/jira/browse/FLINK-5397
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Stefan Richter
> Fix For: 1.2.0, 1.3.0
>
>
> To restore from the savepoints in previous versions, Flink now keeps all 
> classes whose serialization is changed and put them in a separated package 
> ("migration"). 
> When deserializing the old savepoints, flink will look up correct descriptors 
> ({{ObjectStreamClass}}) for these classes, without using those ones written 
> in serialized data. The implementation however is problematic when there 
> exist missing field descriptors in the serialized data. 
> When serializing an object, Java will only write the descriptors of those 
> non-null fields. But when we look up class descriptors with given classes, 
> all fields will be put into the descriptors. As a result, we will deserialize 
> the savepoints with incorrect descriptors, leading to serialization 
> exceptions.
> A simple resolution is to update the name of read descriptors using 
> Reflections, without using different descriptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3051: [FLINK-5399] Add more information to checkpoint re...

2017-01-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3051


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4890) FileInputFormatTest#testExcludeFiles fails on Windows OS

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4890.
---

> FileInputFormatTest#testExcludeFiles fails on Windows OS
> 
>
> Key: FLINK-4890
> URL: https://issues.apache.org/jira/browse/FLINK-4890
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.1.3
> Environment: Windows 10
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.3.0
>
>
> Running the mentioned test leads to an exception:
> {code}
> Illegal char <:> at index 2: 
> /C:/dev/cygwin64/tmp/junit3838395086498044255/another_file.bin
> java.nio.file.InvalidPathException: Illegal char <:> at index 2: 
> /C:/dev/cygwin64/tmp/junit3838395086498044255/anot
>   
>her_file.bin
> at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182)
> at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153)
> at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
> at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
> at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
> at java.nio.file.Paths.get(Paths.java:84)
> at 
> org.apache.flink.api.common.io.GlobFilePathFilter.filterPath(GlobFilePathFilter.java:95)
> at 
> org.apache.flink.api.common.io.FileInputFormat.acceptFile(FileInputFormat.java:644)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:600)
> at 
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:476)
> at 
> org.apache.flink.api.common.io.FileInputFormatTest.testReadMultiplePatterns(FileInputFormatTest.java:362)
> {code}
> The problem is that we are given a flink Path, which is then converted to a 
> String and given to the nio FIleSystem. The passed path is thus /C:/..., 
> which nio can't work with.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804162#comment-15804162
 ] 

ASF GitHub Bot commented on FLINK-5399:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3051


> Add more information to checkpoint result of TriggerSavepointSuccess
> 
>
> Key: FLINK-5399
> URL: https://issues.apache.org/jira/browse/FLINK-5399
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: shijinkui
>
> Add checkpointId and triggerTime to TriggerSavepointSuccess
> We can record the history of trigger checkpoint out of Flink System.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4890) FileInputFormatTest#testExcludeFiles fails on Windows OS

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4890.
-
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in
  - 1.2.0 via fb48c3b4cbc5a186cb7b812c8d05833c5852b385
  - 1.3.0 via 15bd1f128f3d2ae1cad68e89355f9c5045b8e830

> FileInputFormatTest#testExcludeFiles fails on Windows OS
> 
>
> Key: FLINK-4890
> URL: https://issues.apache.org/jira/browse/FLINK-4890
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.1.3
> Environment: Windows 10
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.3.0
>
>
> Running the mentioned test leads to an exception:
> {code}
> Illegal char <:> at index 2: 
> /C:/dev/cygwin64/tmp/junit3838395086498044255/another_file.bin
> java.nio.file.InvalidPathException: Illegal char <:> at index 2: 
> /C:/dev/cygwin64/tmp/junit3838395086498044255/anot
>   
>her_file.bin
> at sun.nio.fs.WindowsPathParser.normalize(WindowsPathParser.java:182)
> at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:153)
> at sun.nio.fs.WindowsPathParser.parse(WindowsPathParser.java:77)
> at sun.nio.fs.WindowsPath.parse(WindowsPath.java:94)
> at sun.nio.fs.WindowsFileSystem.getPath(WindowsFileSystem.java:255)
> at java.nio.file.Paths.get(Paths.java:84)
> at 
> org.apache.flink.api.common.io.GlobFilePathFilter.filterPath(GlobFilePathFilter.java:95)
> at 
> org.apache.flink.api.common.io.FileInputFormat.acceptFile(FileInputFormat.java:644)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:600)
> at 
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:476)
> at 
> org.apache.flink.api.common.io.FileInputFormatTest.testReadMultiplePatterns(FileInputFormatTest.java:362)
> {code}
> The problem is that we are given a flink Path, which is then converted to a 
> String and given to the nio FIleSystem. The passed path is thus /C:/..., 
> which nio can't work with.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3053: [FLINK-5400] Add accessor to folding states in Run...

2017-01-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3053


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5400) Add accessor to folding states in RuntimeContext

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804161#comment-15804161
 ] 

ASF GitHub Bot commented on FLINK-5400:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3053


> Add accessor to folding states in RuntimeContext
> 
>
> Key: FLINK-5400
> URL: https://issues.apache.org/jira/browse/FLINK-5400
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now {{RuntimeContext}} does not provide the accessors to folding states. 
> Therefore users cannot use folding states in their rich functions. I think we 
> should provide the missing accessor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5397) Fail to deserialize savepoints in v1.1 when there exist missing fields in class serialization descriptors

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5397.
-
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in
  - 1.2.0 via 3554c96d118a411906a22b1f1087de073617a4c7
  - 1.3.0 via 09614cc82486b2682ba08876b47019cc604574ed

> Fail to deserialize savepoints in v1.1 when there exist missing fields in 
> class serialization descriptors
> -
>
> Key: FLINK-5397
> URL: https://issues.apache.org/jira/browse/FLINK-5397
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Stefan Richter
> Fix For: 1.2.0, 1.3.0
>
>
> To restore from the savepoints in previous versions, Flink now keeps all 
> classes whose serialization is changed and put them in a separated package 
> ("migration"). 
> When deserializing the old savepoints, flink will look up correct descriptors 
> ({{ObjectStreamClass}}) for these classes, without using those ones written 
> in serialized data. The implementation however is problematic when there 
> exist missing field descriptors in the serialized data. 
> When serializing an object, Java will only write the descriptors of those 
> non-null fields. But when we look up class descriptors with given classes, 
> all fields will be put into the descriptors. As a result, we will deserialize 
> the savepoints with incorrect descriptors, leading to serialization 
> exceptions.
> A simple resolution is to update the name of read descriptors using 
> Reflections, without using different descriptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5390) input should be closed in finally block in YarnFlinkApplicationMasterRunner#loadJobGraph()

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5390.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed via 9f7ad84abf7f1c33c4ee40be1eb0297a28a30f57

Thank you for the contribution!

> input should be closed in finally block in 
> YarnFlinkApplicationMasterRunner#loadJobGraph()
> --
>
> Key: FLINK-5390
> URL: https://issues.apache.org/jira/browse/FLINK-5390
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
> Fix For: 1.3.0
>
>
> {code}
> FileInputStream input = new FileInputStream(fp);
> ObjectInputStream obInput = new ObjectInputStream(input);
> jg = (JobGraph) obInput.readObject();
> input.close();
> {code}
> If readObject() throws exception, input would be left unclosed.
> Similar issue is in AbstractYarnClusterDescriptor#startAppMaster() around 
> line 726.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5390) input should be closed in finally block in YarnFlinkApplicationMasterRunner#loadJobGraph()

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5390.
---

> input should be closed in finally block in 
> YarnFlinkApplicationMasterRunner#loadJobGraph()
> --
>
> Key: FLINK-5390
> URL: https://issues.apache.org/jira/browse/FLINK-5390
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
> Fix For: 1.3.0
>
>
> {code}
> FileInputStream input = new FileInputStream(fp);
> ObjectInputStream obInput = new ObjectInputStream(input);
> jg = (JobGraph) obInput.readObject();
> input.close();
> {code}
> If readObject() throws exception, input would be left unclosed.
> Similar issue is in AbstractYarnClusterDescriptor#startAppMaster() around 
> line 726.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5399.
---

> Add more information to checkpoint result of TriggerSavepointSuccess
> 
>
> Key: FLINK-5399
> URL: https://issues.apache.org/jira/browse/FLINK-5399
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: shijinkui
>Assignee: shijinkui
> Fix For: 1.3.0
>
>
> Add checkpointId and triggerTime to TriggerSavepointSuccess
> We can record the history of trigger checkpoint out of Flink System.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5400) Add accessor to folding states in RuntimeContext

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5400.
---

> Add accessor to folding states in RuntimeContext
> 
>
> Key: FLINK-5400
> URL: https://issues.apache.org/jira/browse/FLINK-5400
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Now {{RuntimeContext}} does not provide the accessors to folding states. 
> Therefore users cannot use folding states in their rich functions. I think we 
> should provide the missing accessor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5400) Add accessor to folding states in RuntimeContext

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5400.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed in d63f831a4b11bb927a8cc216b4901d9262e44053

Thank you for the contribution!

> Add accessor to folding states in RuntimeContext
> 
>
> Key: FLINK-5400
> URL: https://issues.apache.org/jira/browse/FLINK-5400
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
> Fix For: 1.3.0
>
>
> Now {{RuntimeContext}} does not provide the accessors to folding states. 
> Therefore users cannot use folding states in their rich functions. I think we 
> should provide the missing accessor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804080#comment-15804080
 ] 

Fabian Hueske commented on FLINK-2168:
--

Hi [~ram_krish], yes. That would be great.
We should wait for FLINK-5280 as well, but this issue is very close to be 
resolved.

I'll assign the issue to you.

> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Wilmer DAZA
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2168) Add HBaseTableSource

2017-01-06 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-2168:
-
Assignee: ramkrishna.s.vasudevan  (was: Wilmer DAZA)

> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804100#comment-15804100
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
I just had a crazy thought. :-) 
What do you think about moving `getFieldNames()` and `getFieldIndicies()` 
into a separate trait / interface maybe `DefinedFieldNames`?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
I just had a crazy thought. :-) 
What do you think about moving `getFieldNames()` and `getFieldIndicies()` 
into a separate trait / interface maybe `DefinedFieldNames`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think I got the idea :) Could you elaborate on it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804108#comment-15804108
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think I got the idea :) Could you elaborate on it?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3052: Swap the pattern matching order

2017-01-06 Thread Fokko
Github user Fokko closed the pull request at:

https://github.com/apache/flink/pull/3052


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3052: Swap the pattern matching order

2017-01-06 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3052
  
Alright, just rebased with master. Looks like that the Travis is working 
again, good job!

Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2977: [FLINK-5084] Replace Java Table API integration tests by ...

2017-01-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2977
  
Thanks for the update and rebasing @mtunique!

PR is good to merge (one file should be moved, see comment above).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5084) Replace Java Table API integration tests by unit tests

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804138#comment-15804138
 ] 

ASF GitHub Bot commented on FLINK-5084:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2977
  
Thanks for the update and rebasing @mtunique!

PR is good to merge (one file should be moved, see comment above).


> Replace Java Table API integration tests by unit tests
> --
>
> Key: FLINK-5084
> URL: https://issues.apache.org/jira/browse/FLINK-5084
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Priority: Minor
>
> The Java Table API is a wrapper on top of the Scala Table API. 
> Instead of operating directly with Expressions like the Scala API, the Java 
> API accepts a String parameter which is parsed into Expressions.
> We could therefore replace the Java Table API ITCases by tests that check 
> that the parsing step produces a valid logical plan.
> This could be done by creating two {{Table}} objects for an identical query 
> once with the Scala Expression API and one with the Java String API and 
> comparing the logical plans of both {{Table}} objects. Basically something 
> like the following:
> {code}
> val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 
> 'g, 'h)
> val joinT1 = ds1.join(ds2).where('b === 'e).select('c, 'g)
> val joinT2 = ds1.join(ds2).where("b = e").select("c, g")
> val lPlan1 = joinT1.logicalPlan
> val lPlan2 = joinT2.logicalPlan
> Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2442: [FLINK-4148] incorrect calculation minDist distance in Qu...

2017-01-06 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/2442
  
Looks good, please merge. Should have been fixed long ago :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804149#comment-15804149
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
we remove the methods from `TableSource` and add them to an interface. If a 
table source does not implement the methods, we use the names provided by the 
`TypeInformation`. If the table source implements the methods, we use those 
names. The distinction is done in `TableSourceTable`

What do you think?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
we remove the methods from `TableSource` and add them to an interface. If a 
table source does not implement the methods, we use the names provided by the 
`TypeInformation`. If the table source implements the methods, we use those 
names. The distinction is done in `TableSourceTable`

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5399) Add more information to checkpoint result of TriggerSavepointSuccess

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5399.
-
   Resolution: Implemented
 Assignee: shijinkui
Fix Version/s: 1.3.0

Implemented in d156f8d73cdd152da924f1923a615374258d5015

Thank you for the contribution!

> Add more information to checkpoint result of TriggerSavepointSuccess
> 
>
> Key: FLINK-5399
> URL: https://issues.apache.org/jira/browse/FLINK-5399
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: shijinkui
>Assignee: shijinkui
> Fix For: 1.3.0
>
>
> Add checkpointId and triggerTime to TriggerSavepointSuccess
> We can record the history of trigger checkpoint out of Flink System.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3048: Clarified the import path of the Breeze DenseVecto...

2017-01-06 Thread Fokko
Github user Fokko closed the pull request at:

https://github.com/apache/flink/pull/3048


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5319) ClassCastException when reusing an inherited method reference as KeySelector for different classes

2017-01-06 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804274#comment-15804274
 ] 

Timo Walther commented on FLINK-5319:
-

Would this change break binary backwards compatibility? I'm not sure if the 
community would accept this change only because of a Java bug that might be 
solved in future releases.

> ClassCastException when reusing an inherited method reference as KeySelector 
> for different classes
> --
>
> Key: FLINK-5319
> URL: https://issues.apache.org/jira/browse/FLINK-5319
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Alexander Chermenin
>Assignee: Timo Walther
>
> Code sample:
> {code}static abstract class A {
> int id;
> A(int id) {this.id = id; }
> int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
> C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
> DataStreamSource bStream = environment.fromElements(bs);
> DataStreamSource cStream = environment.fromElements(cs);
> bStream.keyBy((KeySelector) A::getId).print();
> cStream.keyBy((KeySelector) A::getId).print();
> environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>   at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>   at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>   at 
> 

[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think we will win a lot with this. Even if we remove these two 
methods from the `TableSource` trait interface there is still `getTypeIndices` 
method and Java users will have to call it if they are going to implement a 
`TableSource` trait. And if a user knows how to inherit a trait with one method 
he/she will be able to inherit a trait with three methods. 

The second problem with this approach is that it's not really 
Object-Oriented. We will have to rely on reflection tricks (probably sugared 
with pattern matching) while we simply having three methods is more clean OO 
solution.

What if we leave all three methods and simply add some base Java 
implementations that already implement these traits? Something like 
`JavaBatchTableSource`, `JavaStreamTableSource`, and 
`JavaBatchStreamTableSource`? Then users will not need to struggle with the 
traits inheritance issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
Which `getTypeIndicies` methods are you referring to? `TableSource` does 
only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move 
the latter two to a separate interface, only `getReturnType` is left.

Also I think this is typical OO design. We do not need reflection to check 
if an object implements an interface. That's a very common operation in Java 
and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is 
sufficient to check whether the table source implements the interface or not.

Isn't this a good compromise of having a lean interface (also simple for 
Java users) and at the same time the possibility to override field names if 
necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804341#comment-15804341
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
I like this idea. In this way, we only need to provide `BatchTableSource` 
and `StreamTableSource` interfaces, not involving the odd 
`BatchStreamTableSource`.  We can keep the interface very clean.

If I understand right, all the concrete implementation of `TableSource` 
will not implement `DefinesFieldNames` for now ?  


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
@mushketyk  no worries :-)

@wuchong Since the methods are only added when needed by implementing the 
interface there is no default implementation. The logic of the default 
implementation (calling the static method of `TableEnvironment`) is directly 
put into the `TableSourceTable` and only replaced if the table source 
implements the new interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804349#comment-15804349
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
@mushketyk  no worries :-)

@wuchong Since the methods are only added when needed by implementing the 
interface there is no default implementation. The logic of the default 
implementation (calling the static method of `TableEnvironment`) is directly 
put into the `TableSourceTable` and only replaced if the table source 
implements the new interface.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804281#comment-15804281
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
I don't think we will win a lot with this. Even if we remove these two 
methods from the `TableSource` trait interface there is still `getTypeIndices` 
method and Java users will have to call it if they are going to implement a 
`TableSource` trait. And if a user knows how to inherit a trait with one method 
he/she will be able to inherit a trait with three methods. 

The second problem with this approach is that it's not really 
Object-Oriented. We will have to rely on reflection tricks (probably sugared 
with pattern matching) while we simply having three methods is more clean OO 
solution.

What if we leave all three methods and simply add some base Java 
implementations that already implement these traits? Something like 
`JavaBatchTableSource`, `JavaStreamTableSource`, and 
`JavaBatchStreamTableSource`? Then users will not need to struggle with the 
traits inheritance issues.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804286#comment-15804286
 ] 

ASF GitHub Bot commented on FLINK-4692:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2938
  
I would like to shepherd this PR.


> Add tumbling group-windows for batch tables
> ---
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> Add Tumble group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2938: [FLINK-4692] [tableApi] Add tumbling group-windows for ba...

2017-01-06 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2938
  
I would like to shepherd this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5418) Estimated row size does not support nested types

2017-01-06 Thread Timo Walther (JIRA)
Timo Walther created FLINK-5418:
---

 Summary: Estimated row size does not support nested types
 Key: FLINK-5418
 URL: https://issues.apache.org/jira/browse/FLINK-5418
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Operations that use 
{{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support 
nested types yet and fail with:

{code}
java.lang.AssertionError: Internal error: Error occurred while applying rule 
DataSetMinusRule

at org.apache.calcite.util.Util.newInternal(Util.java:792)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256)
at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288)
at 
org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
at 
org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
at 
org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.TableException: Unsupported data 

[jira] [Updated] (FLINK-5418) Estimated row size does not support nested types

2017-01-06 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-5418:

Affects Version/s: 1.2.0

> Estimated row size does not support nested types
> 
>
> Key: FLINK-5418
> URL: https://issues.apache.org/jira/browse/FLINK-5418
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Operations that use 
> {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support 
> nested types yet and fail with:
> {code}
> java.lang.AssertionError: Internal error: Error occurred while applying rule 
> DataSetMinusRule
>   at org.apache.calcite.util.Util.newInternal(Util.java:792)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
>   at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288)
>   at 
> org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
>   at 
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
>   at 
> org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804300#comment-15804300
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3039
  
Which `getTypeIndicies` methods are you referring to? `TableSource` does 
only have `getReturnType`, `getFieldNames` and `getFieldIndicies`. If we move 
the latter two to a separate interface, only `getReturnType` is left.

Also I think this is typical OO design. We do not need reflection to check 
if an object implements an interface. That's a very common operation in Java 
and Scala. A simple, `isInstanceOf[DefinesFieldNames]` in `TableSourceTable` is 
sufficient to check whether the table source implements the interface or not.

Isn't this a good compromise of having a lean interface (also simple for 
Java users) and at the same time the possibility to override field names if 
necessary?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
@fhueske 

Sorry, there are only two methods. Please ignore my comment :)

I think you are right and this seems like a good approach. If @wuchong is 
on board with this I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3037: Flink-4450 update storm version to 1.0

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3037#discussion_r9492
  
--- Diff: 
flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 ---
@@ -186,15 +186,15 @@ public void testCreateTopologyContext() {
.shuffleGrouping("bolt2", 
TestDummyBolt.groupingStreamId)
.shuffleGrouping("bolt2", 
TestDummyBolt.shuffleStreamId);
 
-   LocalCluster cluster = new LocalCluster();
-   Config c = new Config();
-   c.setNumAckers(0);
-   cluster.submitTopology("test", c, builder.createTopology());
-
-   while (TestSink.result.size() != 8) {
-   Utils.sleep(100);
-   }
-   cluster.shutdown();
+// LocalCluster cluster = new LocalCluster();
--- End diff --

Are these lines accidentally commented out, or is this not working with 
Storm 1.0 any more?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804333#comment-15804333
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
@fhueske 

Sorry, there are only two methods. Please ignore my comment :)

I think you are right and this seems like a good approach. If @wuchong is 
on board with this I'll update the PR accordingly.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2017-01-06 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3039
  
I like this idea. In this way, we only need to provide `BatchTableSource` 
and `StreamTableSource` interfaces, not involving the odd 
`BatchStreamTableSource`.  We can keep the interface very clean.

If I understand right, all the concrete implementation of `TableSource` 
will not implement `DefinesFieldNames` for now ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5401) Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5401.
---

> Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics
> ---
>
> Key: FLINK-5401
> URL: https://issues.apache.org/jira/browse/FLINK-5401
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
> Environment: macOS
>Reporter: Anton Solovev
>
> {code}
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics(ConnectionUtilsTest.java:45)
> {code}
> in org.apache.flink.runtime.net.ConnectionUtilsTest while testing 1.1.3 RC2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5401) Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics

2017-01-06 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5401.
-
Resolution: Invalid

Has already been solved in newer versions of Flink via 
36c09b0996404eac47abfc3ef8387a5f353f9756

> Fails ConnectionUtilsTest#testReturnLocalHostAddressUsingHeuristics
> ---
>
> Key: FLINK-5401
> URL: https://issues.apache.org/jira/browse/FLINK-5401
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
> Environment: macOS
>Reporter: Anton Solovev
>
> {code}
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> org.apache.flink.runtime.net.ConnectionUtilsTest.testReturnLocalHostAddressUsingHeuristics(ConnectionUtilsTest.java:45)
> {code}
> in org.apache.flink.runtime.net.ConnectionUtilsTest while testing 1.1.3 RC2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3071: [FLINK-5417][DOCUMENTATION]correct the wrong config file ...

2017-01-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3071
  
you should be able to open the svg file with any text editor and add the 
license manually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5417) Fix the wrong config file name

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804374#comment-15804374
 ] 

ASF GitHub Bot commented on FLINK-5417:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3071
  
you should be able to open the svg file with any text editor and add the 
license manually.


> Fix the wrong config file name 
> ---
>
> Key: FLINK-5417
> URL: https://issues.apache.org/jira/browse/FLINK-5417
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Wang
>Priority: Trivial
>
> As the config file name is conf/flink-conf.yaml, the usage 
> "conf/flink-config.yaml" in document is wrong and easy to confuse user. We 
> should correct them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3072: [Flink-5378] Update Scopt version to 3.5.0

2017-01-06 Thread LorenzBuehmann
GitHub user LorenzBuehmann opened a pull request:

https://github.com/apache/flink/pull/3072

[Flink-5378] Update Scopt version to 3.5.0

This will also allow for using comma-separated values in the CLI.

Note, as per https://github.com/scopt/scopt/releases/tag/v3.5.0 Scopt 3.5.0 
introduces two-column rendering for the usage text, which is enabled by 
default. From my point of view this is even better as it'S more compact and 
better to read.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/LorenzBuehmann/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3072


commit 85aa6079fe610fe4cce066da90edcc7cf87a9704
Author: Lorenz Buehmann 
Date:   2017-01-06T12:34:13Z

[Flink-5378] Bumped Scopt version to 3.5.0.

This will also allow for using comma-separated values in the CLI.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3070: [FLINK-5119][web-frontend] Fix problems in displaying TM ...

2017-01-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3070
  
Changes look good, adding them to my next batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3069: [FLINK-5381][web-frontend] Fix scrolling issues

2017-01-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3069
  
Changes look good, adding them to my next batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804502#comment-15804502
 ] 

ASF GitHub Bot commented on FLINK-5381:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3069
  
Changes look good, adding them to my next batch.


> Scrolling in some web interface pages doesn't work (taskmanager details, 
> jobmanager config)
> ---
>
> Key: FLINK-5381
> URL: https://issues.apache.org/jira/browse/FLINK-5381
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> It seems that scrolling in the web interface doesn't work anymore on some 
> pages in the 1.2 release branch.
> Example pages: 
> - When you click the "JobManager" tab
> - The TaskManager logs page



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804503#comment-15804503
 ] 

ASF GitHub Bot commented on FLINK-5119:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3070
  
Changes look good, adding them to my next batch.


> Last taskmanager heartbeat not showing in web frontend
> --
>
> Key: FLINK-5119
> URL: https://issues.apache.org/jira/browse/FLINK-5119
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.3
>Reporter: Ufuk Celebi
>
> The web frontend does not list anything for the last heartbeat in the web 
> frontend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5419) Taskmanager metrics not accessible via REST

2017-01-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5419:
---

 Summary: Taskmanager metrics not accessible via REST
 Key: FLINK-5419
 URL: https://issues.apache.org/jira/browse/FLINK-5419
 Project: Flink
  Issue Type: Bug
  Components: Metrics, Webfrontend
Affects Versions: 1.2.0, 1.3.0
Reporter: Chesnay Schepler
Priority: Blocker
 Fix For: 1.2.0, 1.3.0


There is currently a URL clash between the TaskManagersHandler and 
TaskManagerMetricsHandler, with both being routed to
{code}
/taskmanagers/:taskmanagerid/metrics
{code}
As a result it is not possible to query the full set of metrics for a 
taskmanager, but only the hard-coded subset that is displayed on the metrics 
tab on the taskmanager page.

This is a side-effect of 6d53bbc4b92e651786ecc8c2c6dfeb8e450a16a3 making the 
URL's more consistent. The TaskManager page in the web-interface has 3 tabs: 
Metrics, Log and Stdout.

The URLs for these tabs are
{code}
/taskmanager//metrics
/taskmanager//log
/taskmanager//stdout
{code}
which correspond to the REST URL's used. Previously, the metrics tab used 
{code}/taskmanager/{code}

However, 70704de0c82cbb7b143dd696221e11999feb3600 then exposed the metrics 
gathered by the metrics system through the REST API. The assumption was that 
general information for the taskmanagers are retrieved via /taskmanager/, 
similar to how the job-related URL's are structured, which sadly isn't the case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5418) Estimated row size does not support nested types

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804506#comment-15804506
 ] 

ASF GitHub Bot commented on FLINK-5418:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/3073

[FLINK-5418] [table] Estimated row size does not support nested types

This adds an estimate for array, composite and generic types.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-5418

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3073.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3073


commit 8472588f35d95c2d227a3387090d3a6e6ded343d
Author: twalthr 
Date:   2017-01-06T12:59:38Z

[FLINK-5418] [table] Estimated row size does not support nested types




> Estimated row size does not support nested types
> 
>
> Key: FLINK-5418
> URL: https://issues.apache.org/jira/browse/FLINK-5418
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Operations that use 
> {{org.apache.flink.table.plan.nodes.FlinkRel#estimateRowSize}} do not support 
> nested types yet and fail with:
> {code}
> java.lang.AssertionError: Internal error: Error occurred while applying rule 
> DataSetMinusRule
>   at org.apache.calcite.util.Util.newInternal(Util.java:792)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
>   at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:225)
>   at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:117)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:213)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:819)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:334)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:256)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:288)
>   at 
> org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
>   at 
> org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
>   at 
> org.apache.flink.table.api.scala.batch.table.SetOperatorsITCase.testMinus(SetOperatorsITCase.scala:175)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> 

[GitHub] flink pull request #3073: [FLINK-5418] [table] Estimated row size does not s...

2017-01-06 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/3073

[FLINK-5418] [table] Estimated row size does not support nested types

This adds an estimate for array, composite and generic types.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-5418

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3073.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3073


commit 8472588f35d95c2d227a3387090d3a6e6ded343d
Author: twalthr 
Date:   2017-01-06T12:59:38Z

[FLINK-5418] [table] Estimated row size does not support nested types




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5381:

Affects Version/s: 1.3.0

> Scrolling in some web interface pages doesn't work (taskmanager details, 
> jobmanager config)
> ---
>
> Key: FLINK-5381
> URL: https://issues.apache.org/jira/browse/FLINK-5381
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
> Fix For: 1.2.0, 1.3.0
>
>
> It seems that scrolling in the web interface doesn't work anymore on some 
> pages in the 1.2 release branch.
> Example pages: 
> - When you click the "JobManager" tab
> - The TaskManager logs page



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5381:

Assignee: Sachin Goel

> Scrolling in some web interface pages doesn't work (taskmanager details, 
> jobmanager config)
> ---
>
> Key: FLINK-5381
> URL: https://issues.apache.org/jira/browse/FLINK-5381
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> It seems that scrolling in the web interface doesn't work anymore on some 
> pages in the 1.2 release branch.
> Example pages: 
> - When you click the "JobManager" tab
> - The TaskManager logs page



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5381:

Component/s: Webfrontend

> Scrolling in some web interface pages doesn't work (taskmanager details, 
> jobmanager config)
> ---
>
> Key: FLINK-5381
> URL: https://issues.apache.org/jira/browse/FLINK-5381
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> It seems that scrolling in the web interface doesn't work anymore on some 
> pages in the 1.2 release branch.
> Example pages: 
> - When you click the "JobManager" tab
> - The TaskManager logs page



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5381) Scrolling in some web interface pages doesn't work (taskmanager details, jobmanager config)

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5381:

Fix Version/s: 1.3.0
   1.2.0

> Scrolling in some web interface pages doesn't work (taskmanager details, 
> jobmanager config)
> ---
>
> Key: FLINK-5381
> URL: https://issues.apache.org/jira/browse/FLINK-5381
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> It seems that scrolling in the web interface doesn't work anymore on some 
> pages in the 1.2 release branch.
> Example pages: 
> - When you click the "JobManager" tab
> - The TaskManager logs page



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5119:

Affects Version/s: 1.3.0
   1.2.0

> Last taskmanager heartbeat not showing in web frontend
> --
>
> Key: FLINK-5119
> URL: https://issues.apache.org/jira/browse/FLINK-5119
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.3, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The web frontend does not list anything for the last heartbeat in the web 
> frontend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5359:

Fix Version/s: 1.3.0
   1.2.0

> Job Exceptions view doesn't scroll 
> ---
>
> Key: FLINK-5359
> URL: https://issues.apache.org/jira/browse/FLINK-5359
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Eron Wright 
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The exception information is cut off because the details panel doesn't have a 
> scrollbar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5359:

Assignee: Sachin Goel

> Job Exceptions view doesn't scroll 
> ---
>
> Key: FLINK-5359
> URL: https://issues.apache.org/jira/browse/FLINK-5359
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Eron Wright 
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The exception information is cut off because the details panel doesn't have a 
> scrollbar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5267:

Affects Version/s: 1.3.0
   1.2.0

> TaskManager logs not scrollable
> ---
>
> Key: FLINK-5267
> URL: https://issues.apache.org/jira/browse/FLINK-5267
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
> Environment: DC/OS 1.8
>Reporter: Mischa Krüger
> Fix For: 1.2.0, 1.3.0
>
>
> Latest master build, have run the quickstart example successfully and wanted 
> to see the TM logs, but they can't be scrolled. Download the log is luckily 
> still possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5359) Job Exceptions view doesn't scroll

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5359:

Affects Version/s: 1.3.0
   1.2.0

> Job Exceptions view doesn't scroll 
> ---
>
> Key: FLINK-5359
> URL: https://issues.apache.org/jira/browse/FLINK-5359
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Eron Wright 
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The exception information is cut off because the details panel doesn't have a 
> scrollbar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5119:

Fix Version/s: 1.3.0
   1.2.0

> Last taskmanager heartbeat not showing in web frontend
> --
>
> Key: FLINK-5119
> URL: https://issues.apache.org/jira/browse/FLINK-5119
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.3, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The web frontend does not list anything for the last heartbeat in the web 
> frontend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5119) Last taskmanager heartbeat not showing in web frontend

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5119:

Assignee: Sachin Goel

> Last taskmanager heartbeat not showing in web frontend
> --
>
> Key: FLINK-5119
> URL: https://issues.apache.org/jira/browse/FLINK-5119
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.1.3, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> The web frontend does not list anything for the last heartbeat in the web 
> frontend.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5267:

Fix Version/s: 1.3.0
   1.2.0

> TaskManager logs not scrollable
> ---
>
> Key: FLINK-5267
> URL: https://issues.apache.org/jira/browse/FLINK-5267
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
> Environment: DC/OS 1.8
>Reporter: Mischa Krüger
> Fix For: 1.2.0, 1.3.0
>
>
> Latest master build, have run the quickstart example successfully and wanted 
> to see the TM logs, but they can't be scrolled. Download the log is luckily 
> still possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5267) TaskManager logs not scrollable

2017-01-06 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-5267:

Assignee: Sachin Goel

> TaskManager logs not scrollable
> ---
>
> Key: FLINK-5267
> URL: https://issues.apache.org/jira/browse/FLINK-5267
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0, 1.3.0
> Environment: DC/OS 1.8
>Reporter: Mischa Krüger
>Assignee: Sachin Goel
> Fix For: 1.2.0, 1.3.0
>
>
> Latest master build, have run the quickstart example successfully and wanted 
> to see the TM logs, but they can't be scrolled. Download the log is luckily 
> still possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5368) Let Kafka consumer show something when it fails to read one topic out of topic list

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15804525#comment-15804525
 ] 

ASF GitHub Bot commented on FLINK-5368:
---

Github user HungUnicorn commented on a diff in the pull request:

https://github.com/apache/flink/pull/3036#discussion_r94943539
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
if (partitionsForTopic != null) {

partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
+   else{
+   LOG.info("Unable to retrieve any 
partitions for the requested topic: {}", topic);
+   }
}
}
 
-   if (partitions.isEmpty()) {
-   throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
-   }
--- End diff --

To throw exception fits our case better than to log INFO message, but 
@rmetzger suggested that some cases might be fine and to log INFO is more 
general if I understood correctly. 


> Let Kafka consumer show something when it fails to read one topic out of 
> topic list
> ---
>
> Key: FLINK-5368
> URL: https://issues.apache.org/jira/browse/FLINK-5368
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Sendoh
>Assignee: Sendoh
>Priority: Critical
>
> As a developer when reading data from many topics, I want Kafka consumer to 
> show something if any topic is not available. The motivation is we read many 
> topics as list at one time, and sometimes we fail to recognize that one or 
> two topics' names have been changed or deprecated, and Flink Kafka connector 
> doesn't show the error.
> My proposed change would be either to throw RuntimeException or to use 
> LOG.error(topic + "doesn't have any partition") if partitionsForTopic is null 
> at this function. 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java#L208
> Any suggestion is welcome.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3036: [FLINK-5368] Log msg if kafka topic doesn't have a...

2017-01-06 Thread HungUnicorn
Github user HungUnicorn commented on a diff in the pull request:

https://github.com/apache/flink/pull/3036#discussion_r94943539
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -208,13 +208,12 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
if (partitionsForTopic != null) {

partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
+   else{
+   LOG.info("Unable to retrieve any 
partitions for the requested topic: {}", topic);
+   }
}
}
 
-   if (partitions.isEmpty()) {
-   throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
-   }
--- End diff --

To throw exception fits our case better than to log INFO message, but 
@rmetzger suggested that some cases might be fine and to log INFO is more 
general if I understood correctly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3065: [hotfix] [table] Enable all CalciteConfigBuilderTest test...

2017-01-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3065
  
Since the tests appear to pass on travis, +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805194#comment-15805194
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3057
  
@EronWright thanks a lot for this fix, I am looking though it now.

From the description in the JIRAs, I take that this adds the logic that 
reads custom JAAS security configurations and uses Flink's internal one (Hadoop 
UGI based) for fallback default?

This should be relevant for the `master` and the 1.2 release branch, 
correct?



> Rework JAAS configuration to support user-supplied entries
> --
>
> Key: FLINK-5364
> URL: https://issues.apache.org/jira/browse/FLINK-5364
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
>  Labels: kerberos, security
>
> Recent issues (see linked) have brought to light a critical deficiency in the 
> handling of JAAS configuration.   
> 1. the MapR distribution relies on an explicit JAAS conf, rather than 
> in-memory conf used by stock Hadoop.
> 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent 
> (one can enable each element separately) but isn't.
> Perhaps we should rework the JAAS conf code to merge any user-supplied 
> configuration with our defaults, rather than using an all-or-nothing 
> approach.   
> We should also address some recent regressions:
> 1. The HadoopSecurityContext should be installed regardless of auth mode, to 
> login with UserGroupInformation, which:
> - handles the HADOOP_USER_NAME variable.
> - installs an OS-specific user principal (from UnixLoginModule etc.) 
> unrelated to Kerberos.
> - picks up the HDFS/HBASE delegation tokens.
> 2. Fix the use of alternative authentication methods - delegation tokens and 
> Kerberos ticket cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3057: [FLINK-5364] Rework JAAS configuration to support user-su...

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3057
  
@EronWright thanks a lot for this fix, I am looking though it now.

From the description in the JIRAs, I take that this adds the logic that 
reads custom JAAS security configurations and uses Flink's internal one (Hadoop 
UGI based) for fallback default?

This should be relevant for the `master` and the 1.2 release branch, 
correct?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3059: [docs] Clarify restart strategy defaults set by checkpoin...

2017-01-06 Thread rehevkor5
Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/3059
  
The changes so far are docs-only, so the CI failure must be unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3746) WebRuntimeMonitorITCase.testNoCopyFromJar failing intermittently

2017-01-06 Thread Boris Osipov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boris Osipov closed FLINK-3746.
---
Resolution: Fixed

Was fixed with FLINK-4255

> WebRuntimeMonitorITCase.testNoCopyFromJar failing intermittently
> 
>
> Key: FLINK-3746
> URL: https://issues.apache.org/jira/browse/FLINK-3746
> Project: Flink
>  Issue Type: Bug
>Reporter: Todd Lisonbee
>Assignee: Boris Osipov
>Priority: Minor
>  Labels: flaky-test
>
> Test failed randomly in Travis,
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/122624299/log.txt
> Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 13.127 sec 
> <<< FAILURE! - in org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase
> testNoCopyFromJar(org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase)
>   Time elapsed: 0.124 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<200 OK> but was:<503 Service Unavailable>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitorITCase.testNoCopyFromJar(WebRuntimeMonitorITCase.java:456)
> Results :
> Failed tests: 
>   WebRuntimeMonitorITCase.testNoCopyFromJar:456 expected:<200 OK> but 
> was:<503 Service Unavailable>



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5178) allow BlobCache touse a distributed file system irrespective of the HA mode

2017-01-06 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5178:
---
Description: After FLINK-5129, high availability (HA) mode adds the ability 
for the BlobCache instances at the task managers to download blobs directly 
from the distributed file system. It would be nice if this also worked in 
non-HA mode.  (was: After FLINK-5129, high availability (HA) mode adds the 
ability for the BlobCache instances at the task managers to download blobs 
directly from the distributed file system. It would be nice if this also worked 
in non-HA mode and BLOB_STORAGE_DIRECTORY_KEY may point to a distributed file 
system.)
Summary: allow BlobCache touse a distributed file system irrespective 
of the HA mode  (was: allow BLOB_STORAGE_DIRECTORY_KEY to point to a 
distributed file system)

> allow BlobCache touse a distributed file system irrespective of the HA mode
> ---
>
> Key: FLINK-5178
> URL: https://issues.apache.org/jira/browse/FLINK-5178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After FLINK-5129, high availability (HA) mode adds the ability for the 
> BlobCache instances at the task managers to download blobs directly from the 
> distributed file system. It would be nice if this also worked in non-HA mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5178) allow BlobCache to use a distributed file system irrespective of the HA mode

2017-01-06 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5178:
---
Summary: allow BlobCache to use a distributed file system irrespective of 
the HA mode  (was: allow BlobCache touse a distributed file system irrespective 
of the HA mode)

> allow BlobCache to use a distributed file system irrespective of the HA mode
> 
>
> Key: FLINK-5178
> URL: https://issues.apache.org/jira/browse/FLINK-5178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After FLINK-5129, high availability (HA) mode adds the ability for the 
> BlobCache instances at the task managers to download blobs directly from the 
> distributed file system. It would be nice if this also worked in non-HA mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805207#comment-15805207
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/3078

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector

My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

```
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink skidder/flink-5355

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3078


commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d
Author: Scott Kidder 
Date:   2016-12-16T16:46:54Z

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector




> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> 

[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-06 Thread skidder
GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/3078

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector

My Flink job that consumes from a Kinesis stream must be restarted at least 
once daily due to an uncaught AmazonKinesisException when reading from Kinesis. 
The complete stacktrace looks like:

```
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
It's interesting that the Kinesis endpoint returned a 500 status code, but 
that's outside the scope of this issue.

I think we can handle this exception in the same manner as a 
ProvisionedThroughputException: performing an exponential backoff and retrying 
a finite number of times before throwing an exception.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink skidder/flink-5355

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3078.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3078


commit 85adf13f62351675e39811b9cb58aa2ac9a9cd4d
Author: Scott Kidder 
Date:   2016-12-16T16:46:54Z

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming 
Connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5424) Improve Restart Strategy Logging

2017-01-06 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5424:


 Summary: Improve Restart Strategy Logging
 Key: FLINK-5424
 URL: https://issues.apache.org/jira/browse/FLINK-5424
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Shannon Carey
Assignee: Shannon Carey
Priority: Minor


I'll be submitting a PR which includes some minor improvements to logging 
related to restart strategies.

Specifically, I added a toString so that the log contains better info about 
failure-rate restart strategy, and I added an explanation in the log when the 
restart strategy is responsible for preventing job restart (currently, there's 
no indication that the restart strategy had anything to do with it).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805618#comment-15805618
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3057
  
@EronWright Thanks for explaining. It figured it out concurrently by 
looking at the updated documentation you wrote ;-) The docs are good, helped a 
lot!


> Rework JAAS configuration to support user-supplied entries
> --
>
> Key: FLINK-5364
> URL: https://issues.apache.org/jira/browse/FLINK-5364
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
>  Labels: kerberos, security
>
> Recent issues (see linked) have brought to light a critical deficiency in the 
> handling of JAAS configuration.   
> 1. the MapR distribution relies on an explicit JAAS conf, rather than 
> in-memory conf used by stock Hadoop.
> 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent 
> (one can enable each element separately) but isn't.
> Perhaps we should rework the JAAS conf code to merge any user-supplied 
> configuration with our defaults, rather than using an all-or-nothing 
> approach.   
> We should also address some recent regressions:
> 1. The HadoopSecurityContext should be installed regardless of auth mode, to 
> login with UserGroupInformation, which:
> - handles the HADOOP_USER_NAME variable.
> - installs an OS-specific user principal (from UnixLoginModule etc.) 
> unrelated to Kerberos.
> - picks up the HDFS/HBASE delegation tokens.
> 2. Fix the use of alternative authentication methods - delegation tokens and 
> Kerberos ticket cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3057: [FLINK-5364] Rework JAAS configuration to support user-su...

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3057
  
@EronWright Thanks for explaining. It figured it out concurrently by 
looking at the updated documentation you wrote ;-) The docs are good, helped a 
lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805600#comment-15805600
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95009232
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
+   try {
+   for (Class moduleClass : 
config.getSecurityModules()) {
+  

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805608#comment-15805608
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95010038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * 
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * 
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka.  Note that the JRE actually draws on 
numerous file locations.
+ * See: 
https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JaasModule.class);
+
+   static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
+
+   static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+   private String priorConfigFile;
+   private javax.security.auth.login.Configuration priorConfig;
+
+   private DynamicConfiguration currentConfig;
+
+   @Override
+   public void install(SecurityUtils.SecurityConfiguration securityConfig) 
{
+
+   // ensure that a config file is always defined, for 
compatibility with
+   // ZK and Kafka which check for the system property and 
existence of the file
+   priorConfigFile = 
System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+   if (priorConfigFile == null) {
+   File configFile = generateDefaultConfigFile();
+   System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
configFile.getAbsolutePath());
+   }
+
+   // read the JAAS configuration file
+   priorConfig = 
javax.security.auth.login.Configuration.getConfiguration();
+
+   // construct a dynamic JAAS configuration
+   currentConfig = new DynamicConfiguration(priorConfig);
+
+   // wire up the configured JAAS login contexts to use the krb5 
entries
+   AppConfigurationEntry[] krb5Entries = 
getAppConfigurationEntries(securityConfig);
+   if(krb5Entries != null) {
+   for (String app : 
securityConfig.getLoginContextNames()) {
+   currentConfig.addAppConfigurationEntry(app, 
krb5Entries);
+   }
+   }
+
+   
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+   }
+
+   @Override
+   public void uninstall() {
+   if(priorConfigFile != null) {
+   

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805607#comment-15805607
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95007014
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
--- End diff --

Can you use `new ArrayList<>()` here? In general, it would be nice to make 
a pass 

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805601#comment-15805601
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r94997957
  
--- Diff: docs/internals/flink_security.md ---
@@ -24,64 +24,109 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of 
various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This 
documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of 
various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors 
(e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access 
for jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time 
(days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation 
supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during 
deployment time. Any jobs submitted will continue to run in the identity of the 
cluster.
+In a production deployment scenario, streaming jobs are understood to run 
for long periods of time (days/weeks/months) and be able to authenticate to 
secure 
+data sources throughout the life of the job.  Kerberos keytabs do not 
expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job 
Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the 
credential configured for a given cluster.
 
 ## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), 
Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, 
Flume, Kinesis etc.,) and each connector may have a specific security 
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While 
satisfying the security requirements for all the connectors evolves over a 
period 
-of time, at this time of writing, the following connectors/services are 
tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors 
(Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary 
authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While 
satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The 
following services and connectors are tested for Kerberos authentication:
 
-- Kafka (0.9)
+- Kafka (0.9+)
 - HDFS
+- HBase
 - ZooKeeper
 
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI 
is a static implementation that takes care of handling Kerberos authentication. 
The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security 
credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for 
each service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, 
or vice versa.The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing 
`org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to 
establish a process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and 
YARN.
+

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805604#comment-15805604
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95009675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Responsible for installing a Hadoop login user.
+ */
+public class HadoopModule implements SecurityModule {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopModule.class);
+
+   UserGroupInformation loginUser;
+
+   @Override
+   public void install(SecurityUtils.SecurityConfiguration securityConfig) 
{
+
+   
UserGroupInformation.setConfiguration(securityConfig.getHadoopConfiguration());
+
+   try {
+   if (UserGroupInformation.isSecurityEnabled() &&
+   
!StringUtils.isBlank(securityConfig.getKeytab()) && 
!StringUtils.isBlank(securityConfig.getPrincipal())) {
+   String keytabPath = (new 
File(securityConfig.getKeytab())).getAbsolutePath();
+
+   
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), 
keytabPath);
+
+   loginUser = UserGroupInformation.getLoginUser();
+
+   // supplement with any available tokens
+   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+   if (fileLocation != null) {
+   /*
+* Use reflection API since the API 
semantics are not available in Hadoop1 profile. Below APIs are
+* used in the context of reading the 
stored tokens from UGI.
+* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+* loginUser.addCredentials(cred);
+   */
+   try {
+   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
+   File.class, 
org.apache.hadoop.conf.Configuration.class);
+   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
+   
securityConfig.getHadoopConfiguration());
+   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
+   Credentials.class);
+   
addCredentialsMethod.invoke(loginUser, cred);
+   } catch (NoSuchMethodException e) {
+   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
+   } catch (InvocationTargetException e) {
+ 

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805603#comment-15805603
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95007256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
+   try {
+   for (Class moduleClass : 
config.getSecurityModules()) {
+  

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805605#comment-15805605
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95008182
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
+   try {
+   for (Class moduleClass : 
config.getSecurityModules()) {
+  

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805606#comment-15805606
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95007601
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
+   try {
+   for (Class moduleClass : 
config.getSecurityModules()) {
+  

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805609#comment-15805609
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95003470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Array;
+
+import javax.annotation.Nullable;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A dynamic JAAS configuration.
+ *
+ * Makes it possible to define Application Configuration Entries (ACEs) at 
runtime, building upon
+ * an (optional) underlying configuration.   Entries from the underlying 
configuration take
+ * precedence over dynamic entries.
+ */
+public class DynamicConfiguration extends Configuration {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(DynamicConfiguration.class);
+
+   private final Configuration delegate;
+
+   private final Map dynamicEntries = new 
HashMap<>();
+
+   /**
+* Create a dynamic configuration.
+* @param delegate an underlying configuration to delegate to, or null.
+ */
+   public DynamicConfiguration(@Nullable Configuration delegate) {
+   this.delegate = delegate;
+   }
+
+   /**
+* Add entries for the given application name.
+ */
+   public void addAppConfigurationEntry(String name, 
AppConfigurationEntry... entry) {
+   final AppConfigurationEntry[] existing = 
dynamicEntries.get(name);
+   final AppConfigurationEntry[] updated;
+   if(existing == null) {
+   updated = Arrays.copyOf(entry, entry.length);
+   }
+   else {
+   updated = merge(existing, entry);
+   }
+   dynamicEntries.put(name, updated);
+   }
+
+   /**
+* Retrieve the AppConfigurationEntries for the specified name
+* from this Configuration.
+*
+* 
+*
+* @param name the name used to index the Configuration.
+*
+* @return an array of AppConfigurationEntries for the specified 
name
+*  from this Configuration, or null if there are no entries
+*  for the specified name
+*/
+   @Override
+   public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+   AppConfigurationEntry[] entry = null;
+   if(delegate != null) {
+   entry = delegate.getAppConfigurationEntry(name);
+   }
+   final AppConfigurationEntry[] existing = 
dynamicEntries.get(name);
+   if(existing != null) {
+   if(entry != null) {
+   entry = merge(entry, existing);
+   }
+   else {
+   entry = Arrays.copyOf(existing, 
existing.length);
+   }
+   }
+   return entry;
+   }
+
+   private static AppConfigurationEntry[] merge(AppConfigurationEntry[] a, 
AppConfigurationEntry[] b) {
+   AppConfigurationEntry[] merged = Arrays.copyOf(a, a.length + 
b.length);
+   Array.copy(b, 0, merged, a.length, b.length);
--- End diff --

Can we use `System.arrayCopy()` here to avoid a dependency on Scala in this 
part of the code?


> Rework JAAS configuration to support user-supplied entries
> 

[jira] [Commented] (FLINK-5364) Rework JAAS configuration to support user-supplied entries

2017-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15805602#comment-15805602
 ] 

ASF GitHub Bot commented on FLINK-5364:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r94994079
  
--- Diff: docs/internals/flink_security.md ---
@@ -24,64 +24,109 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of 
various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This 
documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of 
various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors 
(e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access 
for jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time 
(days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation 
supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during 
deployment time. Any jobs submitted will continue to run in the identity of the 
cluster.
+In a production deployment scenario, streaming jobs are understood to run 
for long periods of time (days/weeks/months) and be able to authenticate to 
secure 
+data sources throughout the life of the job.  Kerberos keytabs do not 
expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job 
Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the 
credential configured for a given cluster.
--- End diff --

Maybe point out here that this refers to a "Flink Cluster" (a set of 
JobManager/TaskManager processes). One can run different jobs with different 
credentials next to each other in YARN by starting different per-job-clusters 
or Yarn/Mesos sessions.


> Rework JAAS configuration to support user-supplied entries
> --
>
> Key: FLINK-5364
> URL: https://issues.apache.org/jira/browse/FLINK-5364
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Critical
>  Labels: kerberos, security
>
> Recent issues (see linked) have brought to light a critical deficiency in the 
> handling of JAAS configuration.   
> 1. the MapR distribution relies on an explicit JAAS conf, rather than 
> in-memory conf used by stock Hadoop.
> 2. the ZK/Kafka/Hadoop security configuration is supposed to be independent 
> (one can enable each element separately) but isn't.
> Perhaps we should rework the JAAS conf code to merge any user-supplied 
> configuration with our defaults, rather than using an all-or-nothing 
> approach.   
> We should also address some recent regressions:
> 1. The HadoopSecurityContext should be installed regardless of auth mode, to 
> login with UserGroupInformation, which:
> - handles the HADOOP_USER_NAME variable.
> - installs an OS-specific user principal (from UnixLoginModule etc.) 
> unrelated to Kerberos.
> - picks up the HDFS/HBASE delegation tokens.
> 2. Fix the use of alternative authentication methods - delegation tokens and 
> Kerberos ticket cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95010038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Responsible for installing a process-wide JAAS configuration.
+ * 
+ * The installed configuration combines login modules based on:
+ * - the user-supplied JAAS configuration file, if any
+ * - a Kerberos keytab, if configured
+ * - any cached Kerberos credentials from the current environment
+ * 
+ * The module also installs a default JAAS config file (if necessary) for
+ * compatibility with ZK and Kafka.  Note that the JRE actually draws on 
numerous file locations.
+ * See: 
https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
+ * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ */
+@Internal
+public class JaasModule implements SecurityModule {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JaasModule.class);
+
+   static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = 
"java.security.auth.login.config";
+
+   static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";
+
+   private String priorConfigFile;
+   private javax.security.auth.login.Configuration priorConfig;
+
+   private DynamicConfiguration currentConfig;
+
+   @Override
+   public void install(SecurityUtils.SecurityConfiguration securityConfig) 
{
+
+   // ensure that a config file is always defined, for 
compatibility with
+   // ZK and Kafka which check for the system property and 
existence of the file
+   priorConfigFile = 
System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
+   if (priorConfigFile == null) {
+   File configFile = generateDefaultConfigFile();
+   System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
configFile.getAbsolutePath());
+   }
+
+   // read the JAAS configuration file
+   priorConfig = 
javax.security.auth.login.Configuration.getConfiguration();
+
+   // construct a dynamic JAAS configuration
+   currentConfig = new DynamicConfiguration(priorConfig);
+
+   // wire up the configured JAAS login contexts to use the krb5 
entries
+   AppConfigurationEntry[] krb5Entries = 
getAppConfigurationEntries(securityConfig);
+   if(krb5Entries != null) {
+   for (String app : 
securityConfig.getLoginContextNames()) {
+   currentConfig.addAppConfigurationEntry(app, 
krb5Entries);
+   }
+   }
+
+   
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
+   }
+
+   @Override
+   public void uninstall() {
+   if(priorConfigFile != null) {
+   System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
priorConfigFile);
+   } else {
+   System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
+   }
+   
javax.security.auth.login.Configuration.setConfiguration(priorConfig);
 

[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r94997957
  
--- Diff: docs/internals/flink_security.md ---
@@ -24,64 +24,109 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document briefly describes how Flink security works in the context of 
various deployment mechanism (Standalone/Cluster vs YARN) 
-and the connectors that participates in Flink Job execution stage. This 
documentation can be helpful for both administrators and developers 
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of 
various deployment mechanisms (Standalone, YARN, or Mesos), 
+filesystems, connectors, and state backends.
 
 ## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors 
(e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase) 
 
-The primary goal of Flink security model is to enable secure data access 
for jobs within a cluster via connectors. In a production deployment scenario, 
-streaming jobs are understood to run for longer period of time 
(days/weeks/months) and the system must be  able to authenticate against secure 
-data sources throughout the life of the job. The current implementation 
supports running Flink clusters (Job Manager/Task Manager/Jobs) under the 
-context of a Kerberos identity based on Keytab credential supplied during 
deployment time. Any jobs submitted will continue to run in the identity of the 
cluster.
+In a production deployment scenario, streaming jobs are understood to run 
for long periods of time (days/weeks/months) and be able to authenticate to 
secure 
+data sources throughout the life of the job.  Kerberos keytabs do not 
expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job 
Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens.   Keep in mind that all jobs share the 
credential configured for a given cluster.
 
 ## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), 
Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI. 
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, 
Flume, Kinesis etc.,) and each connector may have a specific security 
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While 
satisfying the security requirements for all the connectors evolves over a 
period 
-of time, at this time of writing, the following connectors/services are 
tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors 
(Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary 
authentication methods (Kerberos, SSL/TLS, username/password, etc.).  While 
satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only.  The 
following services and connectors are tested for Kerberos authentication:
 
-- Kafka (0.9)
+- Kafka (0.9+)
 - HDFS
+- HBase
 - ZooKeeper
 
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI 
is a static implementation that takes care of handling Kerberos authentication. 
The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security 
credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for 
each service or connector.  For example, the user may enable 
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, 
or vice versa.The shared element is the configuration of 
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing 
`org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup.  The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to 
establish a process-wide *login user* context.   The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and 
YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will 
have whatever Kerberos credential is configured.  Otherwise,
+the login user conveys only the user identity of the OS account that 
launched the cluster.
+
   

[GitHub] flink pull request #3057: [FLINK-5364] Rework JAAS configuration to support ...

2017-01-06 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3057#discussion_r95009232
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -71,163 +64,93 @@
 */
public static void install(SecurityConfiguration config) throws 
Exception {
 
-   if (!config.securityIsEnabled()) {
-   // do not perform any initialization if no Kerberos 
crendetails are provided
-   return;
-   }
-
-   // establish the JAAS config
-   JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
-   
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
-
-   populateSystemSecurityProperties(config.flinkConf);
-
-   // establish the UGI login user
-   UserGroupInformation.setConfiguration(config.hadoopConf);
-
-   // only configure Hadoop security if we have security enabled
-   if (UserGroupInformation.isSecurityEnabled()) {
-
-   final UserGroupInformation loginUser;
-
-   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
-   String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
-
-   
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
-
-   loginUser = UserGroupInformation.getLoginUser();
-
-   // supplement with any available tokens
-   String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-   if (fileLocation != null) {
-   /*
-* Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
-* used in the context of reading the stored 
tokens from UGI.
-* Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
-* loginUser.addCredentials(cred);
-   */
-   try {
-   Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
-   File.class, 
org.apache.hadoop.conf.Configuration.class);
-   Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
-   config.hadoopConf);
-   Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
-   Credentials.class);
-   
addCredentialsMethod.invoke(loginUser, cred);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-   }
-   } else {
-   // login with current user credentials (e.g. 
ticket cache)
-   try {
-   //Use reflection API to get the login 
user object
-   
//UserGroupInformation.loginUserFromSubject(null);
-   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
-   Subject subject = null;
-   loginUserFromSubjectMethod.invoke(null, 
subject);
-   } catch (NoSuchMethodException e) {
-   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
-   }
-
-   // note that the stored tokens are read 
automatically
-   loginUser = UserGroupInformation.getLoginUser();
+   // install the security modules
+   List modules = new ArrayList();
+   try {
+   for (Class moduleClass : 
config.getSecurityModules()) {
+   SecurityModule module = 
moduleClass.newInstance();
+   module.install(config);
+   modules.add(module);
}
+   }
+

  1   2   3   >