[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-08 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475975#comment-15475975
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

[~ggevay]
Thanks for the inputs. Let me see that also. I am just trying to  make things 
work. I can make a PR after I feel it is good. May be early next week. This 
whole week was busy with some personal things so could not complete on time.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475827#comment-15475827
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @fhueske @twalthr , do you have time to have a look again? Any advices 
are welcome.


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL

2016-09-08 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @fhueske @twalthr , do you have time to have a look again? Any advices 
are welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-08 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
I have noticed the issue occasionally in master branch too but very 
inconsistent. I just rebased against the latest master and ran "mvn clean 
verify". I don't see any errors. I have tried couple of times to see if it 
reoccurs but it looks good. Not sure if the issue is addressed by some other 
patches in the master branch?  Could you please give it a try?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475694#comment-15475694
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
I have noticed the issue occasionally in master branch too but very 
inconsistent. I just rebased against the latest master and ran "mvn clean 
verify". I don't see any errors. I have tried couple of times to see if it 
reoccurs but it looks good. Not sure if the issue is addressed by some other 
patches in the master branch?  Could you please give it a try?


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475107#comment-15475107
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Hi @greghogan sorry, lost thread on this, updated for Preconditions check 
now. Unless i am missing something, the enableQuotedStringParsing() is present 
in subclasses as it is slightly different than the parent and involves 
if-else-casting to be invoked.But in case of setCharset it is same for all 
possible subclasses and does not need to be.Hope you agree.Thanks.


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding

2016-09-08 Thread rekhajoshm
Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Hi @greghogan sorry, lost thread on this, updated for Preconditions check 
now. Unless i am missing something, the enableQuotedStringParsing() is present 
in subclasses as it is slightly different than the parent and involves 
if-else-casting to be invoked.But in case of setCharset it is same for all 
possible subclasses and does not need to be.Hope you agree.Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4571) Configurable little parallelism in Gelly drivers

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4571.
-
   Resolution: Fixed
Fix Version/s: (was: 1.2.0)

Fixed in bdd3c0d94b2a6cdecb482ee3fdefe082fc1b7c4d

> Configurable little parallelism in Gelly drivers
> 
>
> Key: FLINK-4571
> URL: https://issues.apache.org/jira/browse/FLINK-4571
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Several Gelly library implementations support a configurable "little 
> parallelism" which is important when scaling to large data sets. These 
> algorithms include operators at the beginning and end which process data on 
> the order of the original DataSet, as well as middle operators that exchange 
> 100s or 1000s more data. The "little parallelism" should be configurable in 
> the appropriate Gelly drivers in the flink-gelly-examples module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4571) Configurable little parallelism in Gelly drivers

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4571:
--
Fix Version/s: 1.2.0

> Configurable little parallelism in Gelly drivers
> 
>
> Key: FLINK-4571
> URL: https://issues.apache.org/jira/browse/FLINK-4571
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Several Gelly library implementations support a configurable "little 
> parallelism" which is important when scaling to large data sets. These 
> algorithms include operators at the beginning and end which process data on 
> the order of the original DataSet, as well as middle operators that exchange 
> 100s or 1000s more data. The "little parallelism" should be configurable in 
> the appropriate Gelly drivers in the flink-gelly-examples module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4257) Handle delegating algorithm change of class

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4257:
--
Fix Version/s: 1.2.0

> Handle delegating algorithm change of class
> ---
>
> Key: FLINK-4257
> URL: https://issues.apache.org/jira/browse/FLINK-4257
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> A class created by {{ProxyFactory}} can intercept and reinterpret method 
> calls using its {{MethodHandler}}, but is restricted in that
> * the type of the proxy class cannot be changed
> * method return types must be honored
> We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that 
> change return type depending on configuration, even between single and dual 
> input functions. This can be problematic, e.g. in {{OperatorTranslation}} 
> where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet 
> instanceof TwoInputOperator}}.
> Even simply changing operator can be problematic, e.g. 
> {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas 
> {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}.
> I see two ways to solve these issues. By adding a simple {{NoOpOperator}} 
> that is skipped over during {{OperatorTranslation}} we could wrap all 
> algorithm output and always be proxying the same class.
> Alternatively, making changes only within Gelly we can append a "no-op" 
> pass-through {{MapFunction}} to any algorithm output which is not a 
> {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass 
> hierarchy such we are always proxying {{SingleInputOperator}}.
> There is one additional issue. When we call {{DataSet.output}} the delegate's 
> {{MethodHandler}} must reinterpret this call to add itself to the list of 
> sinks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4522) Gelly link broken in homepage

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4522.
-
Resolution: Fixed

Fixed in 95ad865bd9d7121c80ecd4d1ed92e2912052a502

> Gelly link broken in homepage
> -
>
> Key: FLINK-4522
> URL: https://issues.apache.org/jira/browse/FLINK-4522
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Gelly
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> The link to the Gelly documentation is broken in the Flink homepage. The link 
> points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this 
> link might be present in other places as well, e.g. slides, trainings, etc., 
> we should re-direct to the new location of the Gelly docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4257) Handle delegating algorithm change of class

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4257.
-
Resolution: Fixed

Fixed in 8210ff468d64fc50520011fc6fed9909d2a6b89a

> Handle delegating algorithm change of class
> ---
>
> Key: FLINK-4257
> URL: https://issues.apache.org/jira/browse/FLINK-4257
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> A class created by {{ProxyFactory}} can intercept and reinterpret method 
> calls using its {{MethodHandler}}, but is restricted in that
> * the type of the proxy class cannot be changed
> * method return types must be honored
> We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that 
> change return type depending on configuration, even between single and dual 
> input functions. This can be problematic, e.g. in {{OperatorTranslation}} 
> where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet 
> instanceof TwoInputOperator}}.
> Even simply changing operator can be problematic, e.g. 
> {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas 
> {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}.
> I see two ways to solve these issues. By adding a simple {{NoOpOperator}} 
> that is skipped over during {{OperatorTranslation}} we could wrap all 
> algorithm output and always be proxying the same class.
> Alternatively, making changes only within Gelly we can append a "no-op" 
> pass-through {{MapFunction}} to any algorithm output which is not a 
> {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass 
> hierarchy such we are always proxying {{SingleInputOperator}}.
> There is one additional issue. When we call {{DataSet.output}} the delegate's 
> {{MethodHandler}} must reinterpret this call to add itself to the list of 
> sinks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4522) Gelly link broken in homepage

2016-09-08 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4522:
--
Fix Version/s: 1.2.0

> Gelly link broken in homepage
> -
>
> Key: FLINK-4522
> URL: https://issues.apache.org/jira/browse/FLINK-4522
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Gelly
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> The link to the Gelly documentation is broken in the Flink homepage. The link 
> points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this 
> link might be present in other places as well, e.g. slides, trainings, etc., 
> we should re-direct to the new location of the Gelly docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4257) Handle delegating algorithm change of class

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475031#comment-15475031
 ] 

ASF GitHub Bot commented on FLINK-4257:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2474


> Handle delegating algorithm change of class
> ---
>
> Key: FLINK-4257
> URL: https://issues.apache.org/jira/browse/FLINK-4257
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> A class created by {{ProxyFactory}} can intercept and reinterpret method 
> calls using its {{MethodHandler}}, but is restricted in that
> * the type of the proxy class cannot be changed
> * method return types must be honored
> We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that 
> change return type depending on configuration, even between single and dual 
> input functions. This can be problematic, e.g. in {{OperatorTranslation}} 
> where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet 
> instanceof TwoInputOperator}}.
> Even simply changing operator can be problematic, e.g. 
> {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas 
> {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}.
> I see two ways to solve these issues. By adding a simple {{NoOpOperator}} 
> that is skipped over during {{OperatorTranslation}} we could wrap all 
> algorithm output and always be proxying the same class.
> Alternatively, making changes only within Gelly we can append a "no-op" 
> pass-through {{MapFunction}} to any algorithm output which is not a 
> {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass 
> hierarchy such we are always proxying {{SingleInputOperator}}.
> There is one additional issue. When we call {{DataSet.output}} the delegate's 
> {{MethodHandler}} must reinterpret this call to add itself to the list of 
> sinks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4571) Configurable little parallelism in Gelly drivers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475029#comment-15475029
 ] 

ASF GitHub Bot commented on FLINK-4571:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2475


> Configurable little parallelism in Gelly drivers
> 
>
> Key: FLINK-4571
> URL: https://issues.apache.org/jira/browse/FLINK-4571
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Several Gelly library implementations support a configurable "little 
> parallelism" which is important when scaling to large data sets. These 
> algorithms include operators at the beginning and end which process data on 
> the order of the original DataSet, as well as middle operators that exchange 
> 100s or 1000s more data. The "little parallelism" should be configurable in 
> the appropriate Gelly drivers in the flink-gelly-examples module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4522) Gelly link broken in homepage

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475030#comment-15475030
 ] 

ASF GitHub Bot commented on FLINK-4522:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2464


> Gelly link broken in homepage
> -
>
> Key: FLINK-4522
> URL: https://issues.apache.org/jira/browse/FLINK-4522
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Gelly
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> The link to the Gelly documentation is broken in the Flink homepage. The link 
> points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this 
> link might be present in other places as well, e.g. slides, trainings, etc., 
> we should re-direct to the new location of the Gelly docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2475: [FLINK-4571] [gelly] Configurable little paralleli...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2475


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2474: [FLINK-4257] [gelly] Handle delegating algorithm c...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2474


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2464: [FLINK-4522] [docs] Gelly link broken in homepage

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2464


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474985#comment-15474985
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78087961
  
--- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
+import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class ${className} extends CompositeTypeComparator implements 
java.io.Serializable {
--- End diff --

Janino ignores generics. If I want generics, need to validate code with 
java as well to be sure that I get it right, so I just decided to not bother 
with it for now.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread Xazax-hun
Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78087961
  
--- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
+import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class ${className} extends CompositeTypeComparator implements 
java.io.Serializable {
--- End diff --

Janino ignores generics. If I want generics, need to validate code with 
java as well to be sure that I get it right, so I just decided to not bother 
with it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474979#comment-15474979
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78087704
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to instantiate 
serializer: " + className, e);
+   }
+
+   }
+
+   private void generateCode(String className) {
+   assert fieldSerializers.length > 0;
+   String typeName = clazz.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   members.append(String.format("final TypeSerializer 
f%d;\n", i));
+   }
+   StringBuilder initMembers = new StringBuilder();
+   for 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread Xazax-hun
Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78087704
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to instantiate 
serializer: " + className, e);
+   }
+
+   }
+
+   private void generateCode(String className) {
+   assert fieldSerializers.length > 0;
+   String typeName = clazz.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   members.append(String.format("final TypeSerializer 
f%d;\n", i));
+   }
+   StringBuilder initMembers = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   initMembers.append(String.format("f%d = 
serializerFields[%d];\n", i, i));
+   }
+   StringBuilder createFields = new StringBuilder();
+   

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474953#comment-15474953
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78086011
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Because arg usually refers to an object. I will add a comment to clarify 
this.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread Xazax-hun
Github user Xazax-hun commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78086011
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Because arg usually refers to an object. I will add a comment to clarify 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474575#comment-15474575
 ] 

Gabor Gevay commented on FLINK-3322:


I've just realized that what I wrote above doesn't cover the resetting of the 
local strategies. For that, we should maybe add a {{ResettableInputProvider}} 
interface  (that extends {{CloseableInputProvider}} (or even just add the reset 
method directly to {{CloseableInputProvider}})), and make the Sorters implement 
it, and call this new {{reset}} method in {{resetAllInputs}} (instead of 
{{close}} and then recreating them).

(But this can go to a separate pull request later.)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-08 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474531#comment-15474531
 ] 

Gabor Gevay commented on FLINK-3322:


I had an offline chat with [~StephanEwen], and an alternative design came up, 
which we should also consider:

There is the {{ResettableDriver}} interface, which is implemented by those 
drivers that need to retain some state between iteration steps. The way 
{{AbstractIterativeTask}} uses this interface, is that it checks if the driver 
is an instance of this interface, and then doesn't destroy and recreate the 
driver between iteration steps, but instead just calls {{reset}} on it. We 
could make every driver implement this interface, and in their {{reset}} method 
they would just hold on to the memory that they already have.

When this is done, it would also allow some simplification by eliminating the 
special case handling that is distinguishing between resettable and 
non-resettable operators in lots of different places.

Since touching every driver probably can't be avoided anyway, this looks like 
the cleanest solution.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example

2016-09-08 Thread Olga Golovneva (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474504#comment-15474504
 ] 

Olga Golovneva commented on FLINK-1526:
---

Hi Greg,
I didn't found much details about this issue either, here is what I have: 
http://mail-archives.apache.org/mod_mbox/flink-dev/201504.mbox/%3ccajz2dcugxrxsdxh_rq4dr52ego+out5oz+5n5xm2an5psrx...@mail.gmail.com%3e
I suppose, the problem was in data caching within a loop
Thnx

> Add Minimum Spanning Tree library method and example
> 
>
> Key: FLINK-1526
> URL: https://issues.apache.org/jira/browse/FLINK-1526
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>
> This issue proposes the addition of a library method and an example for 
> distributed minimum spanning tree in Gelly.
> The DMST algorithm is very interesting because it is quite different from 
> PageRank-like iterative graph algorithms. It consists of distinct phases 
> inside the same iteration and requires a mechanism to detect convergence of 
> one phase to proceed to the next one. Current implementations in 
> vertex-centric models are quite long (>1000 lines) and hard to understand.
> You can find a description of the algorithm [here | 
> http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | 
> http://www.vldb.org/pvldb/vol7/p1047-han.pdf].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4522) Gelly link broken in homepage

2016-09-08 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474418#comment-15474418
 ] 

Greg Hogan commented on FLINK-4522:
---

I am only seeing this affect version 1.2. The 1.1 docs have a single page of 
Gelly documentation.

> Gelly link broken in homepage
> -
>
> Key: FLINK-4522
> URL: https://issues.apache.org/jira/browse/FLINK-4522
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Gelly
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
>
> The link to the Gelly documentation is broken in the Flink homepage. The link 
> points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this 
> link might be present in other places as well, e.g. slides, trainings, etc., 
> we should re-direct to the new location of the Gelly docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4585) Fix broken links in index.md

2016-09-08 Thread Alexander Pivovarov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Pivovarov closed FLINK-4585.
--

> Fix broken links in index.md
> 
>
> Key: FLINK-4585
> URL: https://issues.apache.org/jira/browse/FLINK-4585
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Alexander Pivovarov
>Priority: Minor
>
> The following links are broken
> DataSet API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> Table API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html
> Gelly
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html
> correct link: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html
> The following links show "Page 'X' Has Moved to" for 1-2 sec  and then 
> redirect to another page
> DataStream API
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html
> programming guide
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html
> redirects-to DataSet API: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html
> probably it should be "Basic API Concepts" 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html
> or Quick Start - 
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html
> CEP
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
> ML
> link: 
> http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html
> redirects-to: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4591) Select star does not work with grouping

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474335#comment-15474335
 ] 

Timo Walther commented on FLINK-4591:
-

Just because it is not allowed in SQL does not mean that we shouldn't allow it 
in Table API. But at least the star should be supported in a {{select}} after a 
{{groupBy}}.

> Select star does not work with grouping
> ---
>
> Key: FLINK-4591
> URL: https://issues.apache.org/jira/browse/FLINK-4591
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> It would be consistent if this would also work:
> {{table.groupBy('*).select("*)}}
> Currently, the star only works in a plain select without grouping.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4550) Clearly define SQL operator table

2016-09-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4550:

Labels: starter  (was: )

> Clearly define SQL operator table
> -
>
> Key: FLINK-4550
> URL: https://issues.apache.org/jira/browse/FLINK-4550
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Currently, we use {{SqlStdOperatorTable.instance()}} for setting all 
> supported operations. However, not all of them are actually supported. 
> {{FunctionCatalog}} should only return those operators that are tested and 
> documented.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3656) Rework Table API tests

2016-09-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-3656:

Labels: starter  (was: )

> Rework Table API tests
> --
>
> Key: FLINK-3656
> URL: https://issues.apache.org/jira/browse/FLINK-3656
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Vasia Kalavri
>  Labels: starter
>
> The Table API tests are very inefficient. At the moment It is mostly 
> end-to-end integration tests, often testing the same functionality several 
> times (Java/Scala, DataSet/DataStream).
> We should look into how we can rework the Table API tests such that:
> - long-running integration tests are converted into faster unit tests
> - common parts of DataSet and DataStream are only tested once
> - common parts of Java and Scala Table APIs are only tested once
> - duplicate tests are completely removed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint

2016-09-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4592:

Labels: starter  (was: )

> Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
> ---
>
> Key: FLINK-4592
> URL: https://issues.apache.org/jira/browse/FLINK-4592
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> It seems that the test is still non deterministic.
> {code}
> org.apache.flink.api.table.expressions.ScalarFunctionsTest
> testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest)
>   Time elapsed: 0.083 sec  <<< FAILURE!
> org.junit.ComparisonFailure: Wrong result for: 
> AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET 
> "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') 
> expected:<[tru]e> but was:<[fals]e>
>   at org.junit.Assert.assertEquals(Assert.java:115)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123)
>   at 
> scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87)
>   at 
> org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474327#comment-15474327
 ] 

Timo Walther commented on FLINK-4565:
-

I'm very sorry. Maybe I underestimated this issue after thinking about it. It 
is the first combination of expression and table which makes this issue tricky. 
Yes the validation is very complicated especially because we have 2 validations 
one for SQL and one for the Table API (with heavy Scala magic). If you are 
still wanna do that, I can also help you in a private chat. Otherwise e.g. 
FLINK-4599 would be easier. I will add the "starter" label to easier tasks.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-08 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4599:

Labels: starter  (was: )

> Add 'explain()' also to StreamTableEnvironment
> --
>
> Key: FLINK-4599
> URL: https://issues.apache.org/jira/browse/FLINK-4599
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> Currenlty, only the BatchTableEnvironment supports the {{explain}} command 
> for tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-08 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
We will have to fix the issue before we can merge this PR. The timeout 
doesn't occur in non-secure Kafka tests. I think we need to increase the 
timeout or change the test layout. Could you look into that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474208#comment-15474208
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
We will have to fix the issue before we can merge this PR. The timeout 
doesn't occur in non-secure Kafka tests. I think we need to increase the 
timeout or change the test layout. Could you look into that?


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474159#comment-15474159
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2456


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2456


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-08 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4456.

Resolution: Fixed

Fixed via 0735b5b935b0c0757943e2d58047afcfb9949560

> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-08 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4458.

Resolution: Fixed

Fixed via 02b852e3571e46f25fdfc79f43ceb726ddff9ba7

> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474137#comment-15474137
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2450


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...

2016-09-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2450


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474082#comment-15474082
 ] 

Simone Robutti commented on FLINK-4565:
---

After a couple of days of work I believe this task is a bit too complex for me. 
I'm struggling with the complexity of constructing the RelNode and doing the 
validation. Missing an example similar enough to take inspiration, it's hard to 
work on this by myself. I can imagine that this would take no more than a 
couple of hours to someone with experience with Flink and Calcite and despite 
all the stuff I've learnt in these days, I'm seeing no progress. My biggest 
problem is that I cannot really wrap my head around many of the exception it 
throws when performing construction and validation.

If there's someone patient enough to go through what I've done right now, I can 
open a WIP PR or link to my repo, pointing to all the stuff that gets me 
confused. Otherwise I can leave this issue to someone more skilled than me and 
retry with something easier and more straightforward.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474038#comment-15474038
 ] 

ASF GitHub Bot commented on FLINK-4572:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2469
  
I split the long to int translator into both signed and unsigned 
translators so the conversion would not be ambiguous. The test will fail 
without the fix in FLINK-4594.


> Convert to negative in LongValueToIntValue
> --
>
> Key: FLINK-4572
> URL: https://issues.apache.org/jira/browse/FLINK-4572
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The Gelly drivers expect that scale 32 edges, represented by the lower 32 
> bits of {{long}} values, can be converted to {{int}} values. Values between 
> 2^31 and 2^32 - 1 should be converted to negative integers, which is not 
> supported by {{MathUtils.checkedDownCast}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...

2016-09-08 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2469
  
I split the long to int translator into both signed and unsigned 
translators so the conversion would not be ambiguous. The test will fail 
without the fix in FLINK-4594.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4594) Validate lower bound in MathUtils.checkedDownCast

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474029#comment-15474029
 ] 

ASF GitHub Bot commented on FLINK-4594:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2481

[FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4594_validate_lower_bound_in_mathutils_checkeddowncast

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2481


commit d273919155cfef48d807b7f93430b52944e145f4
Author: Greg Hogan 
Date:   2016-09-08T14:35:39Z

[FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast




> Validate lower bound in MathUtils.checkedDownCast
> -
>
> Key: FLINK-4594
> URL: https://issues.apache.org/jira/browse/FLINK-4594
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> {{MathUtils.checkedDownCast}} only compares against the upper bound 
> {{Integer.MAX_VALUE}}, which has worked with current usage. 
> Rather than adding a second comparison we can replace
> {noformat}
> if (value > Integer.MAX_VALUE) {
> {noformat}
> with a cast and check
> {noformat}
> if ((int)value != value) { ...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2481: [FLINK-4594] [core] Validate lower bound in MathUt...

2016-09-08 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2481

[FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4594_validate_lower_bound_in_mathutils_checkeddowncast

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2481


commit d273919155cfef48d807b7f93430b52944e145f4
Author: Greg Hogan 
Date:   2016-09-08T14:35:39Z

[FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473966#comment-15473966
 ] 

Timo Walther commented on FLINK-4565:
-

Yes, this would fail then. But I think there is no other good solution if you 
specify expressions as String as we do in the Java API.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473867#comment-15473867
 ] 

Simone Robutti commented on FLINK-4565:
---

I'm no expert in the Table API, but if a user doesn't explicitely register a 
table in the table register, will this fail? 

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78003840
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to instantiate 
serializer: " + className, e);
+   }
+
+   }
+
+   private void generateCode(String className) {
+   assert fieldSerializers.length > 0;
+   String typeName = clazz.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   members.append(String.format("final TypeSerializer 
f%d;\n", i));
+   }
+   StringBuilder initMembers = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   initMembers.append(String.format("f%d = 
serializerFields[%d];\n", i, i));
+   }
+   StringBuilder createFields = new StringBuilder();
+  

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473854#comment-15473854
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78003840
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private final Class clazz;
+   private final Field[] refFields;
+   private final TypeSerializer[] fieldSerializers;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoSerializerGenerator(
+   Class clazz,
+   TypeSerializer[] fields,
+   Field[] reflectiveFields,
+   ExecutionConfig config) {
+   this.clazz = checkNotNull(clazz);
+   this.refFields = checkNotNull(reflectiveFields);
+   this.fieldSerializers = checkNotNull(fields);
+   this.config = checkNotNull(config);
+   for (int i = 0; i < this.refFields.length; i++) {
+   this.refFields[i].setAccessible(true);
+   }
+   }
+
+   public TypeSerializer createSerializer()  {
+   final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
+   final String fullClassName = packageName + "." + className;
+   Class serializerClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if(config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
+   }
+   try {
+   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
+   }
+   Constructor[] ctors = serializerClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeSerializer) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to instantiate 
serializer: " + className, e);
+   }
+
+   }
+
+   private void generateCode(String className) {
+   assert fieldSerializers.length > 0;
+   String typeName = clazz.getCanonicalName();
+   StringBuilder members = new StringBuilder();
+   for (int i = 0; i < fieldSerializers.length; ++i) {
+   members.append(String.format("final TypeSerializer 
f%d;\n", i));
+   }
+   StringBuilder initMembers = new StringBuilder();
+   for (int i 

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473852#comment-15473852
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78003691
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78003691
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean stateful = false;
+   TypeSerializer[] duplicateFieldSerializers = new 
TypeSerializer[numFields];
+   <#list duplicateSerializers as ds>
+   ${ds}
+   
+   if (stateful) {
+ 

[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example

2016-09-08 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473850#comment-15473850
 ] 

Greg Hogan commented on FLINK-1526:
---

Hi Olga, do you have a reference for the for-loop iteration issue as it 
pertains to this algorithm? There wasn't much discussion on this ticket or 
associated pull request. Is the intent to use iterations or a loop with data 
caching?

> Add Minimum Spanning Tree library method and example
> 
>
> Key: FLINK-1526
> URL: https://issues.apache.org/jira/browse/FLINK-1526
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Vasia Kalavri
>
> This issue proposes the addition of a library method and an example for 
> distributed minimum spanning tree in Gelly.
> The DMST algorithm is very interesting because it is quite different from 
> PageRank-like iterative graph algorithms. It consists of distinct phases 
> inside the same iteration and requires a mechanism to detect convergence of 
> one phase to proceed to the next one. Current implementations in 
> vertex-centric models are quite long (>1000 lines) and hard to understand.
> You can find a description of the algorithm [here | 
> http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | 
> http://www.vldb.org/pvldb/vol7/p1047-han.pdf].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473836#comment-15473836
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78002589
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78002589
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean stateful = false;
+   TypeSerializer[] duplicateFieldSerializers = new 
TypeSerializer[numFields];
+   <#list duplicateSerializers as ds>
+   ${ds}
+   
+   if (stateful) {
+ 

[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473833#comment-15473833
 ] 

Timo Walther commented on FLINK-4565:
-

I think it is ok, if you just parse the table name and introduce a 
`UnresolvedTable` expression which can later be resolved/looked up in the table 
registry.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473826#comment-15473826
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78001809
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78001809
  
--- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class ${className} extends TypeSerializer {
+   private static byte IS_NULL = 1;
+   private static byte NO_SUBCLASS = 2;
+   private static byte IS_SUBCLASS = 4;
+   private static byte IS_TAGGED_SUBCLASS = 8;
+   private int numFields;
+   private ExecutionConfig executionConfig;
+   private Map subclassSerializerCache;
+   private final Map registeredClasses;
+   private final TypeSerializer[] registeredSerializers;
+   Class clazz;
+   <#list members as m>
+   ${m}
+   
+   public ${className}(Class clazz, TypeSerializer[] serializerFields, 
ExecutionConfig e) {
+   this.clazz = clazz;
+   executionConfig = e;
+   this.numFields = serializerFields.length;
+   LinkedHashSet registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
+   subclassSerializerCache = new HashMap();
+   List cleanedTaggedClasses = new 
ArrayList(registeredPojoTypes.size());
+   for (Class registeredClass: registeredPojoTypes) {
+   if (registeredClass.equals(clazz)) {
+   continue;
+   }
+   if (!clazz.isAssignableFrom(registeredClass)) {
+   continue;
+   }
+   cleanedTaggedClasses.add(registeredClass);
+   }
+   this.registeredClasses = new LinkedHashMap(cleanedTaggedClasses.size());
+   registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+   int id = 0;
+   for (Class registeredClass: cleanedTaggedClasses) {
+   this.registeredClasses.put(registeredClass, id);
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+   registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+   id++;
+   }
+   <#list initMembers as m>
+   ${m}
+   
+   }
+   private TypeSerializer getSubclassSerializer(Class subclass) {
+   TypeSerializer result = 
(TypeSerializer)subclassSerializerCache.get(subclass);
+   if (result == null) {
+   TypeInformation typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+   result = typeInfo.createSerializer(executionConfig);
+   subclassSerializerCache.put(subclass, result);
+   }
+   return result;
+   }
+   public boolean isImmutableType() { return false; }
+   public ${className} duplicate() {
+   boolean stateful = false;
+   TypeSerializer[] duplicateFieldSerializers = new 
TypeSerializer[numFields];
+   <#list duplicateSerializers as ds>
+   ${ds}
+   
+   if (stateful) {
+ 

[jira] [Assigned] (FLINK-4579) Add StateBackendFactory for RocksDB Backend

2016-09-08 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4579:
--

Assignee: Jark Wu

> Add StateBackendFactory for RocksDB Backend
> ---
>
> Key: FLINK-4579
> URL: https://issues.apache.org/jira/browse/FLINK-4579
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
>
> Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} 
> which means that users cannot specify to use the RocksDB backend in the flink 
> configuration.
> If we add a factory for rocksdb we should also think about adding the rocksdb 
> backend to the standard distribution lib, otherwise it is only usable if 
> users manually place the rocks jars in the Flink lib folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473806#comment-15473806
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78000250
  
--- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
+import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class ${className} extends CompositeTypeComparator implements 
java.io.Serializable {
--- End diff --

Maybe add `<${className}>` as generic argument to `CompositeTypeComparator`


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r78000250
  
--- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${packageName};
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
+import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
+import org.apache.flink.util.InstantiationUtil;
+
+public final class ${className} extends CompositeTypeComparator implements 
java.io.Serializable {
--- End diff --

Maybe add `<${className}>` as generic argument to `CompositeTypeComparator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473799#comment-15473799
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77999725
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = (TypeComparator[]) comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   Class comparatorClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if (config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeComparatorProxy<>(type, 
fullClassName, code, comparators, serializer);
+   }
+   try {
+   comparatorClazz = 
InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
comparator: " + className, e);
+   }
+   Constructor[] ctors = comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeComparator) ctors[0].newInstance(new 
Object[]{comparators, serializer, type});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77999725
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+
+public final class PojoComparatorGenerator {
+   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
+
+   private transient Field[] keyFields;
+   private transient Integer[] keyFieldIds;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+   private final Class type;
+   private final ExecutionConfig config;
+   private String code;
+
+   public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] 
comparators, TypeSerializer serializer,
+   
Class type, Integer[] keyFieldIds, ExecutionConfig config) {
+   this.keyFields = keyFields;
+   this.comparators = (TypeComparator[]) comparators;
+
+   this.type = type;
+   this.serializer = serializer;
+   this.keyFieldIds = keyFieldIds;
+   this.config = config;
+   }
+
+   public TypeComparator createComparator() {
+   // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
+   // name should determine the generated comparator. This 
information is used for caching (avoiding
+   // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
+   // with the name.
+   StringBuilder keyBuilder = new StringBuilder();
+   for(Integer i : keyFieldIds) {
+   keyBuilder.append(i);
+   keyBuilder.append("_");
+   }
+   final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
+   keyBuilder.toString();
+   final String fullClassName = packageName + "." + className;
+   Class comparatorClazz;
+   code = InstantiationUtil.getCodeForCachedClass(fullClassName);
+   if (code == null) {
+   generateCode(className);
+   }
+   if (config.isWrapGeneratedClassesEnabled()) {
+   return new GenTypeComparatorProxy<>(type, 
fullClassName, code, comparators, serializer);
+   }
+   try {
+   comparatorClazz = 
InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
comparator: " + className, e);
+   }
+   Constructor[] ctors = comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   try {
+   return (TypeComparator) ctors[0].newInstance(new 
Object[]{comparators, serializer, type});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to instantiate 
comparator using: " + ctors[0].getName(), e);
+   }
+   }
+
+
+   private void generateCode(String className) {
+   String typeName = type.getCanonicalName();
+   StringBuilder 

[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473775#comment-15473775
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997800
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

could we not set this value in the `operatorConfig` instead?


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997800
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -273,7 +276,7 @@ public void releaseOutputs() {
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
-   chainedOperator.setup(containingTask, operatorConfig, output);
+   chainedOperator.setup(containingTask, operatorConfig, output, 
streamOutputs.size() == 0);
--- End diff --

could we not set this value in the `operatorConfig` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473764#comment-15473764
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
 ---
@@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws 
Exception {
}
}
 
+
--- End diff --

unnecessary new line


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997086
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
 ---
@@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws 
Exception {
}
}
 
+
--- End diff --

unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997021
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -347,4 +521,5 @@ public void close() {
output.close();
}
}
+
--- End diff --

unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473762#comment-15473762
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77997021
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -347,4 +521,5 @@ public void close() {
output.close();
}
}
+
--- End diff --

unnecessary new line


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473760#comment-15473760
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
+   LOG.warn("{} has been set to a value below 0: {}. Using 
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+   historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+   }
+
+   latencyGauge = new LatencyGauge(this.metrics, historySize, 
!isSink);
--- End diff --

this looks a bit odd; `latencyGauge = this.metrics.gauge(new 
LatencyGauge(historySize, isSink), "latency")` would be more consistent to how 
other metrics are registered.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
+   LOG.warn("{} has been set to a value below 0: {}. Using 
default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+   historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+   }
+
+   latencyGauge = new LatencyGauge(this.metrics, historySize, 
!isSink);
--- End diff --

this looks a bit odd; `latencyGauge = this.metrics.gauge(new 
LatencyGauge(historySize, isSink), "latency")` would be more consistent to how 
other metrics are registered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473754#comment-15473754
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
--- End diff --

missing space after if


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996545
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -94,20 +102,38 @@
/** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
private transient KeyedStateBackend keyedStateBackend;
 
-   protected transient MetricGroup metrics;
+   // --- Metrics ---
+
+   /** Metric group for the operator */
+   protected MetricGroup metrics;
+
+   /** Flag indicating if this operator is a sink */
+   protected transient boolean isSink = false;
+
+   protected LatencyGauge latencyGauge;
+
 
// 

//  Life Cycle
// 

 
@Override
-   public void setup(StreamTask containingTask, StreamConfig config, 
Output output) {
+   public void setup(StreamTask containingTask, StreamConfig config, 
Output output, boolean isSink) {
this.container = containingTask;
this.config = config;
String operatorName = 
containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();

this.metrics = 
container.getEnvironment().getMetricGroup().addOperator(operatorName);
this.output = new CountingOutput(output, 
this.metrics.counter("numRecordsOut"));
+   this.isSink = isSink;
+   Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
+   int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+   if(historySize <= 0) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473751#comment-15473751
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996378
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -149,6 +154,8 @@
 
private LinkedHashSet registeredPojoTypes = new 
LinkedHashSet<>();
 
+
--- End diff --

new lines should be removed


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996378
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -149,6 +154,8 @@
 
private LinkedHashSet registeredPojoTypes = new 
LinkedHashSet<>();
 
+
--- End diff --

new lines should be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473745#comment-15473745
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996152
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
--- End diff --

LatencyMaker -> LatencyMarker


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> 

[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473744#comment-15473744
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77996133
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
--- End diff --

Please add javadoc and audience/stability annotation (probably Internal).


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473747#comment-15473747
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996178
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
+The marker contains a timestamp from the time when the record has been 
emitted at the sources.
+Latency marker can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
--- End diff --

marker -> markers


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, 

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77996133
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
+import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
+
+public final class PojoSerializerGenerator {
--- End diff --

Please add javadoc and audience/stability annotation (probably Internal).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996178
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
+The marker contains a timestamp from the time when the record has been 
emitted at the sources.
+Latency marker can not overtake regular user records, thus if records are 
queuing up in front of an operator, 
--- End diff --

marker -> markers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...

2016-09-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2386#discussion_r77996152
  
--- Diff: docs/monitoring/metrics.md ---
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
   
 
 
-  
-Task
-currentLowWatermark
-The lowest watermark a task has received.
-  
-  
-lastCheckpointDuration
-The time it took to complete the last checkpoint.
-  
-  
-lastCheckpointSize
-The total size of the last checkpoint.
-  
-  
-restartingTime
-The time it took to restart the job.
-  
-  
-numBytesInLocal
-The total number of bytes this task has read from a local 
source.
-  
-  
-numBytesInRemote
-The total number of bytes this task has read from a remote 
source.
-  
-  
-numBytesOut
-The total number of bytes this task has emitted.
-  
-
-
-  
-Operator
-numRecordsIn
-The total number of records this operator has received.
-  
-  
-numRecordsOut
-The total number of records this operator has emitted.
-  
-  
-numSplitsProcessed
-The total number of InputSplits this data source has 
processed.
-  
+  Task
+  currentLowWatermark
+  The lowest watermark a task has received.
+
+
+  lastCheckpointDuration
+  The time it took to complete the last checkpoint.
+
+
+  lastCheckpointSize
+  The total size of the last checkpoint.
+
+
+  restartingTime
+  The time it took to restart the job.
+
+
+  numBytesInLocal
+  The total number of bytes this task has read from a local 
source.
+
+
+  numBytesInRemote
+  The total number of bytes this task has read from a remote 
source.
+
+
+  numBytesOut
+  The total number of bytes this task has emitted.
+
+
+  Operator
+  numRecordsIn
+  The total number of records this operator has received.
+
+
+  numRecordsOut
+  The total number of records this operator has emitted.
+
+
+  numSplitsProcessed
+  The total number of InputSplits this data source has processed 
(if the operator is a data source).
+
+
+  latency
+  A latency gauge reporting the latency distribution from the 
different sources.
 
   
 
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. 
To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive 
value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a 
special record, called a `LatencyMaker`.
--- End diff --

LatencyMaker -> LatencyMarker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473738#comment-15473738
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995790
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
--- End diff --

Please add javadoc.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995790
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+
+public class GenTypeSerializerProxy extends TypeSerializer {
--- End diff --

Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995491
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
--- End diff --

Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473732#comment-15473732
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995491
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
--- End diff --

Please add javadoc.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473725#comment-15473725
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995157
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + name, e);
+   }
+   }
+
+   public GenTypeComparatorProxy(Class clazz, String name, String 
code,TypeComparator[] comparators,
+   
TypeSerializer serializer) {
+   this.name = name;
+   this.code = code;
+   this.clazz = clazz;
+   this.comparators = comparators;
+   this.serializer = serializer;
+   compile();
+   }
+
+   @SuppressWarnings("unchecked")
--- End diff --

Not needed if `comparators` has ``


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77995157
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+public class GenTypeComparatorProxy extends CompositeTypeComparator 
implements java.io.Serializable {
+   private final String code;
+   private final String name;
+   private final Class clazz;
+   private final TypeComparator[] comparators;
+   private final TypeSerializer serializer;
+
+   transient private CompositeTypeComparator impl = null;
+
+   private void compile() {
+   try {
+   assert impl == null;
+   Class comparatorClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), name, code);
+   Constructor[] ctors = 
comparatorClazz.getConstructors();
+   assert ctors.length == 1;
+   impl = (CompositeTypeComparator) 
ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
+   } catch (Exception e) {
+   throw new RuntimeException("Unable to generate 
serializer: " + name, e);
+   }
+   }
+
+   public GenTypeComparatorProxy(Class clazz, String name, String 
code,TypeComparator[] comparators,
+   
TypeSerializer serializer) {
+   this.name = name;
+   this.code = code;
+   this.clazz = clazz;
+   this.comparators = comparators;
+   this.serializer = serializer;
+   compile();
+   }
+
+   @SuppressWarnings("unchecked")
--- End diff --

Not needed if `comparators` has ``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r7799
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473715#comment-15473715
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r7799
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same as above.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473702#comment-15473702
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993831
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Why do we need to cast to the boxed types if the field is non-boxed?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993831
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
--- End diff --

Why do we need to cast to the boxed types if the field is non-boxed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473689#comment-15473689
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993022
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Is this possible at this point? The rules of being a POJO should not allow 
this, right?


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473692#comment-15473692
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993053
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same question as above.


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993022
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Is this possible at this point? The rules of being a POJO should not allow 
this, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

2016-09-08 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2211#discussion_r77993053
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java 
---
@@ -419,4 +481,61 @@ public String toString() {
return "NamedFlatFieldDescriptor [name="+fieldName+" 
position="+getPosition()+" typeInfo="+getType()+"]";
}
}
+
+   public static String accessStringForField(Field f) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   return fieldName;
+   }
+   String getterName = "get" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(getterName, new Class[0]);
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
+   return fieldName + "()";
+   }
+   return getterName + "()";
+   }
+
+   public static String modifyStringForField(Field f, String arg) {
+   String fieldName = f.getName();
+   if (Modifier.isPublic(f.getModifiers())) {
+   if (f.getType().isPrimitive()) {
+   return f.getName() + " = (" +
+   
primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + 
")" + arg;
+   } else {
+   return f.getName() + " = (" + 
f.getType().getCanonicalName() + ")" + arg;
+   }
+   }
+   String setterName = "set" + 
Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
+   Class parentClazz = f.getDeclaringClass();
+   try {
+   parentClazz.getMethod(setterName, f.getType());
+   } catch (NoSuchMethodException e) {
+   // No getter, it might be a scala class.
--- End diff --

Same question as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-09-08 Thread Simone Robutti (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473685#comment-15473685
 ] 

Simone Robutti commented on FLINK-4565:
---

I defined this parser combinator rule ```lazy val in:PackratParser[Expression] 
= term ~ "IN" ~ table ^^{
case l ~ _ ~ r => In(l,r)
  }```

`In` takes an expression (containing the field of the left table to be matched 
against the right table) and a Table. I need to define  a parser for a Table 
but I never worked with parser-combinators so I have no idea how to do that. 
Any suggestion?

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473681#comment-15473681
 ] 

ASF GitHub Bot commented on FLINK-3660:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2386
  
@aljoscha I updated the pull request, which now builds green (just rebasing 
fixed the issues).
I also keep metrics now per operator, not only for the sinks.


> Measure latency of elements and expose it through web interface
> ---
>
> Key: FLINK-3660
> URL: https://issues.apache.org/jira/browse/FLINK-3660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: pre-apache
>
>
> It would be nice to expose the end-to-end latency of a streaming job in the 
> webinterface.
> To achieve this, my initial thought was to attach an ingestion-time timestamp 
> at the sources to each record.
> However, this introduces overhead for a monitoring feature users might not 
> even use (8 bytes for each element + System.currentTimeMilis() on each 
> element).
> Therefore, I suggest to implement this feature by periodically sending 
> special events, similar to watermarks through the topology. 
> Those {{LatencyMarks}} are emitted at a configurable interval at the sources 
> and forwarded by the tasks. The sinks will compare the timestamp of the 
> latency marks with their current system time to determine the latency.
> The latency marks will not add to the latency of a job, but the marks will be 
> delayed similarly than regular records, so their latency will approximate the 
> record latency.
> Above suggestion expects the clocks on all taskmanagers to be in sync. 
> Otherwise, the measured latencies would also include the offsets between the 
> taskmanager's clocks.
> In a second step, we can try to mitigate the issue by using the JobManager as 
> a central timing service. The TaskManagers will periodically query the JM for 
> the current time in order to determine the offset with their clock.
> This offset would still include the network latency between TM and JM but it 
> would still lead to reasonably good estimations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >