[jira] [Commented] (FLINK-4498) Better Cassandra sink documentation

2017-09-15 Thread Michael Fong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168827#comment-16168827
 ] 

Michael Fong commented on FLINK-4498:
-

I would like to help updating the documents for Cassandra connector and provide 
more concrete (and meaningful) examples. 

> Better Cassandra sink documentation
> ---
>
> Key: FLINK-4498
> URL: https://issues.apache.org/jira/browse/FLINK-4498
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector, Documentation
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The Cassandra sink documentation is somewhat muddled and could be improved.  
> For instance, the fact that is only supports tuples and POJO's that use 
> DataStax Mapper annotations is only mentioned in passing, and it is not clear 
> that the reference to tuples only applies to Flink Java tuples and not Scala 
> tuples.  
> The documentation also does not mention that setQuery() is only necessary for 
> tuple streams. 
> The explanation of the write ahead log could use some cleaning up to clarify 
> when it is appropriate to use, ideally with an example.  Maybe this would be 
> best as a blog post to expand on the type of non-deterministic streams this 
> applies to.
> It would also be useful to mention that tuple elements will be mapped to 
> Cassandra columns using the Datastax Java driver's default encoders, which 
> are somewhat limited (e.g. to write to a blob column the type in the tuple 
> must be a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-09-15 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168733#comment-16168733
 ] 

Elias Levy commented on FLINK-7266:
---

I am curious what the state of this is.  It is still a problem on 1.3.2, making 
use of S3 with the file system state backend very imprudent in production.  You 
end up with thousands of empty "directories" in S3 for the checkpoints

{code}
$ $ sudo aws s3 ls --recursive 
s3://bucket/flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/ 
2017-09-15 23:03:15  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-1/
2017-09-15 23:04:15  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-10/
2017-09-15 23:14:07  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-100/
2017-09-15 23:14:14  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-101/
2017-09-15 23:14:20  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-102/
2017-09-15 23:15:12  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-103/
2017-09-15 23:15:18  0 
flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-104/
...
{code}

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168489#comment-16168489
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4562


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
> Fix For: 1.4.0
>
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7625) typo in docs metrics sections

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168490#comment-16168490
 ] 

ASF GitHub Bot commented on FLINK-7625:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4676


>  typo in docs metrics sections
> --
>
> Key: FLINK-7625
> URL: https://issues.apache.org/jira/browse/FLINK-7625
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.3.2
>Reporter: Hai Zhou
>Assignee: Hai Zhou
> Fix For: 1.4.0, 1.3.3
>
>
> Infix  Metrics
> Status.JVM.Memory  *Memory.Heap.Used*
> changed to
> Status.JVM.Memory *Heap.Used*



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7494) No license headers in ".travis.yml" file

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168491#comment-16168491
 ] 

ASF GitHub Bot commented on FLINK-7494:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4575


> No license headers in ".travis.yml" file
> 
>
> Key: FLINK-7494
> URL: https://issues.apache.org/jira/browse/FLINK-7494
> Project: Flink
>  Issue Type: Wish
>  Components: Travis
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> I will fix the ".travis.yml" file.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4575: [FLINK-7494][travis] Add license headers to '.trav...

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4575


---


[GitHub] flink pull request #4676: [FLINK-7625] Fix typo in metrics part of documents

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4676


---


[GitHub] flink pull request #4641: [hotfix][docs] Fix a typo on log name for quick st...

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4641


---


[jira] [Updated] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-09-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-7402:
--
Fix Version/s: 1.4.0

> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
> Fix For: 1.4.0
>
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4562


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168434#comment-16168434
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237307
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

Thanks. Use java.util.Map instead.  


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-15 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237307
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

Thanks. Use java.util.Map instead.  


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168430#comment-16168430
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 canonize(relType)
   }
 
+  override def createMultisetType(elementType: RelDataType, 
maxCardinality: Long): RelDataType = {
+val relType = new MultisetRelDataType(
--- End diff --

Added changes in FlinkRelNode & ExpressionReducer


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-15 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 canonize(relType)
   }
 
+  override def createMultisetType(elementType: RelDataType, 
maxCardinality: Long): RelDataType = {
+val relType = new MultisetRelDataType(
--- End diff --

Added changes in FlinkRelNode & ExpressionReducer


---


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168414#comment-16168414
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4562
  
Merging ...


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...

2017-09-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4562
  
Merging ...


---


[jira] [Commented] (FLINK-7625) typo in docs metrics sections

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168395#comment-16168395
 ] 

ASF GitHub Bot commented on FLINK-7625:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4676
  
Merging ...


>  typo in docs metrics sections
> --
>
> Key: FLINK-7625
> URL: https://issues.apache.org/jira/browse/FLINK-7625
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.3.2
>Reporter: Hai Zhou
>Assignee: Hai Zhou
> Fix For: 1.4.0, 1.3.3
>
>
> Infix  Metrics
> Status.JVM.Memory  *Memory.Heap.Used*
> changed to
> Status.JVM.Memory *Heap.Used*



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4676: [FLINK-7625] Fix typo in metrics part of documents

2017-09-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4676
  
Merging ...


---


[GitHub] flink issue #4575: [FLINK-7494][travis] Add license headers to '.travis.yml'...

2017-09-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4575
  
Merging ...


---


[jira] [Commented] (FLINK-7494) No license headers in ".travis.yml" file

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168337#comment-16168337
 ] 

ASF GitHub Bot commented on FLINK-7494:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4575
  
Merging ...


> No license headers in ".travis.yml" file
> 
>
> Key: FLINK-7494
> URL: https://issues.apache.org/jira/browse/FLINK-7494
> Project: Flink
>  Issue Type: Wish
>  Components: Travis
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> I will fix the ".travis.yml" file.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4641: [hotfix][docs] Fix a typo on log name for quick start gui...

2017-09-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4641
  
Merging ...


---


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168158#comment-16168158
 ] 

Kostas Kloudas commented on FLINK-7606:
---

Now that I think about it, can you check your watermarks and their values? And 
if the watermark advances past the 10 min + 10 sec (your bounded 
out-of-orderness) in event time.

If the watermark does not advance then the patterns do not time out.
This, in combination with a potential re-ordering of events (as I described 
before) can lead to the NFA always having some stale "Idle" elements that are 
waiting for their corresponding "Start" which never arrives because it was 
dropped as late.

To verify this, could you set the parallelism of your job to 1, make sure that 
"Idle" precedes "Start" and at the end of the iteration, your source sends a 
Watermark with a high timestamp so that all buffered elements are flushed or 
timed-out. This is a very controlled experiment just to see where we stand.

It would also be useful if you have the timestamp generator for your input 
elements. 
To see how time and the watermark advances.
Then I could give you more detailed answers.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168099#comment-16168099
 ] 

Kostas Kloudas edited comment on FLINK-7606 at 9/15/17 4:38 PM:


Also, on another note, I read that you have fewer results than expected. 

Could you check if some elements are dropped as late and if some timed-out 
patterns are emitted?

As stated in the documentation, the CEP library assumes correctness of the 
watermark. So, any of the above can happen because of the different parallel 
instances sending elements at different rates (so elements can be re-ordered) 
in combination with an unfortunate watermark alignment. 


was (Author: kkl0u):
Also, on another note, I read that you have fewer results than expected. 

Could you check if some of these patterns are emitted as timed out patterns? In 
essence, could you check if timed-out patterns + successful ones == 3. 

This could happen if, for example, a "Start" arrives before the "Idle" (due to 
parallelism), and there is a watermark in between them whose timestamp  is 
greater than that of the "Idle" element. In this case, the "Idle" will be 
dropped as late and the pattern will time out. This is due to the fact that as 
stated in the documentation, the CEP library assumes correctness of the 
watermark.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168099#comment-16168099
 ] 

Kostas Kloudas commented on FLINK-7606:
---

Also, on another note, I read that you have fewer results than expected. 

Could you check if some of these patterns are emitted as timed out patterns? In 
essence, could you check if timed-out patterns + successful ones == 3. 

This could happen if, for example, a "Start" arrives before the "Idle" (due to 
parallelism), and there is a watermark in between them whose timestamp  is 
greater than that of the "Idle" element. In this case, the "Idle" will be 
dropped as late and the pattern will time out. This is due to the fact that as 
stated in the documentation, the CEP library assumes correctness of the 
watermark.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168090#comment-16168090
 ] 

Kostas Kloudas commented on FLINK-7606:
---

Yes [~i...@paolorendano.it]. This holds for every keyed state. 
The amount of state you store is proportional to the number of active keys that 
you expect to have. 

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7410) Use toString method to display operator names for UserDefinedFunction

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168072#comment-16168072
 ] 

ASF GitHub Bot commented on FLINK-7410:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r139187009
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 ---
@@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable {
   def close(): Unit = {}
 
   /**
-* @return true iff a call to this function is guaranteed to always 
return
+* @return true if a call to this function is guaranteed to always 
return
--- End diff --

We should be writing out "if and only if" iff that is the intent.


> Use toString method to display operator names for UserDefinedFunction
> -
>
> Key: FLINK-7410
> URL: https://issues.apache.org/jira/browse/FLINK-7410
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> *Motivation*
> Operator names setted in table-api are used by visualization and logging, it 
> is import to make these names simple and readable. Currently, 
> UserDefinedFunction’s name contains class CanonicalName and md5 value making 
> the name too long and unfriendly to users. 
> As shown in the following example, 
> {quote}
> select: (a, b, c, 
> org$apache$flink$table$expressions$utils$RichFunc1$281f7e61ec5d8da894f5783e2e17a4f5(a)
>  AS _c3, 
> org$apache$flink$table$expressions$utils$RichFunc2$fb99077e565685ebc5f48b27edc14d98(c)
>  AS _c4)
> {quote}
> *Changes:*
>   
> Use {{toString}} method to display operator names for UserDefinedFunction. 
> The method will return class name by default. Users can also override the 
> method to return whatever he wants.
> What do you think [~fhueske] ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4624: [FLINK-7410] [table] Use toString method to displa...

2017-09-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4624#discussion_r139187009
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 ---
@@ -41,7 +41,7 @@ abstract class UserDefinedFunction extends Serializable {
   def close(): Unit = {}
 
   /**
-* @return true iff a call to this function is guaranteed to always 
return
+* @return true if a call to this function is guaranteed to always 
return
--- End diff --

We should be writing out "if and only if" iff that is the intent.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r139186699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -115,6 +115,13 @@ public Configuration getConfiguration() {
return tmpDirectories;
}
 
+   public String getFirstTmpDirectory(){
--- End diff --

Just now starting to catch up but the use of the temporary file was the 
only issue I had when using the original PR. Great to see it's already fixed!


---


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168055#comment-16168055
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4574#discussion_r139185112
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1900,7 +1900,8 @@ private boolean isValidPojoField(Field f, Class 
clazz, ArrayList typeHi
ParameterizedType parameterizedType, 
TypeInformation in1Type, TypeInformation in2Type) {
 
if (!Modifier.isPublic(clazz.getModifiers())) {
-   LOG.info("Class " + clazz.getName() + " is not public, 
cannot treat it as a POJO type. Will be handled as GenericType");
+   LOG.info("Class " + clazz.getName() + " is not public, 
cannot treat it as a POJO type. " +
+   "Will be handled as GenericType, and may lose 
some serialization performance.");
--- End diff --

What if we said something like "... is not public and cannot be processed 
as a POJO type. Please read the Flink documentation on "Data Types & 
Serialization" for details on the impact to performance."


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168056#comment-16168056
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4574#discussion_r139184023
  
--- Diff: docs/dev/types_serialization.md ---
@@ -115,6 +115,8 @@ conditions are fulfilled:
   or have a public getter- and a setter- method that follows the Java beans
   naming conventions for getters and setters.
 
+Note that when a data type can't be recognized as a POJO type, it will be 
handled as GenericType.
--- End diff --

"... it will be processed as a GenericType and serialized with Kryo"? Is it 
generally understood that use of Kryo can have a significant impact on 
performance? It's not just serialization which is slower. It looks like if the 
class does not implement `NormalizedKey` then objects are deserialized for each 
comparison.


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...

2017-09-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4574#discussion_r139185112
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java 
---
@@ -1900,7 +1900,8 @@ private boolean isValidPojoField(Field f, Class 
clazz, ArrayList typeHi
ParameterizedType parameterizedType, 
TypeInformation in1Type, TypeInformation in2Type) {
 
if (!Modifier.isPublic(clazz.getModifiers())) {
-   LOG.info("Class " + clazz.getName() + " is not public, 
cannot treat it as a POJO type. Will be handled as GenericType");
+   LOG.info("Class " + clazz.getName() + " is not public, 
cannot treat it as a POJO type. " +
+   "Will be handled as GenericType, and may lose 
some serialization performance.");
--- End diff --

What if we said something like "... is not public and cannot be processed 
as a POJO type. Please read the Flink documentation on "Data Types & 
Serialization" for details on the impact to performance."


---


[GitHub] flink pull request #4574: [FLINK-6864] Fix confusing "invalid POJO type" mes...

2017-09-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4574#discussion_r139184023
  
--- Diff: docs/dev/types_serialization.md ---
@@ -115,6 +115,8 @@ conditions are fulfilled:
   or have a public getter- and a setter- method that follows the Java beans
   naming conventions for getters and setters.
 
+Note that when a data type can't be recognized as a POJO type, it will be 
handled as GenericType.
--- End diff --

"... it will be processed as a GenericType and serialized with Kryo"? Is it 
generally understood that use of Kryo can have a significant impact on 
performance? It's not just serialization which is slower. It looks like if the 
class does not implement `NormalizedKey` then objects are deserialized for each 
comparison.


---


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168025#comment-16168025
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
from your comment I understand that the basic memory (so with minimal load of 
messages) I have to setup for a single instance is directly related with the 
number of keys I want to be able to manage. I correctly understood, do I?

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7273) Gelly tests with empty graphs

2017-09-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-7273.
-
Resolution: Fixed

master: 9437a0ffc04318f6a1a2d19c59f2ae6651b26507

> Gelly tests with empty graphs
> -
>
> Key: FLINK-7273
> URL: https://issues.apache.org/jira/browse/FLINK-7273
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.4.0
>
>
> There exist some tests with empty graphs but the `EmptyGraph` in 
> `AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph` without 
> vertices and test both empty graphs for each algorithm.
> `PageRank` should (optionally?) include zero-degree vertices in the results.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7199) Graph simplification does not set parallelism

2017-09-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-7199:
--
Fix Version/s: 1.4.0

> Graph simplification does not set parallelism
> -
>
> Key: FLINK-7199
> URL: https://issues.apache.org/jira/browse/FLINK-7199
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.4.0
>
>
> The {{Simplify}} parameter should accept and set the parallelism when calling 
> the {{Simplify}} algorithms.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7199) Graph simplification does not set parallelism

2017-09-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-7199.
-
Resolution: Fixed

master: 2ac09c084320f1803c623c719dca8b4776c8110f

> Graph simplification does not set parallelism
> -
>
> Key: FLINK-7199
> URL: https://issues.apache.org/jira/browse/FLINK-7199
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.4.0
>
>
> The {{Simplify}} parameter should accept and set the parallelism when calling 
> the {{Simplify}} algorithms.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7447) Hope add more committer information to "Community & Project Info" page.

2017-09-15 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167976#comment-16167976
 ] 

Greg Hogan commented on FLINK-7447:
---

The timezone is of little benefit for real-time communication without knowing a 
developer's availability regarding schedule, holidays, work tasks, etc. If 
timezones imply an obligation or expectation that committer's are reachable at 
certain times then we should continue to simply provide committer emails 
(@apache.org) and direct users and developers to the mailing lists.

> Hope add more committer information to "Community & Project Info" page.
> ---
>
> Key: FLINK-7447
> URL: https://issues.apache.org/jira/browse/FLINK-7447
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou
>
> I wish add the "organization" and "time zone" information to committer 
> introduction, while using the mail instead of Apache ID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7563) Fix watermark semantics in CEP operators

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167967#comment-16167967
 ] 

ASF GitHub Bot commented on FLINK-7563:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4632
  
I added the test case to the `CEP Operator` and addressed the line. Thanks 
so much.


> Fix watermark semantics in CEP operators
> 
>
> Key: FLINK-7563
> URL: https://issues.apache.org/jira/browse/FLINK-7563
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Aljoscha Krettek
>Assignee: Yueting Chen
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> See 
> https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E
>  for reference.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...

2017-09-15 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4632
  
I added the test case to the `CEP Operator` and addressed the line. Thanks 
so much.


---


[jira] [Commented] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-15 Thread Wei-Che Wei (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167955#comment-16167955
 ] 

Wei-Che Wei commented on FLINK-7630:


Hi [~aljoscha]
FYI. If there is any feedback, please let me know. Thank you.

> Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()
> 
>
> Key: FLINK-7630
> URL: https://issues.apache.org/jira/browse/FLINK-7630
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
> From this discussion, it seems that the current functionality of 
> {{ParameterTool.fromPropertiesFile}} is not enough.
> It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide 
> more kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167954#comment-16167954
 ] 

Kostas Kloudas commented on FLINK-7606:
---

It should be cleared. 

In event time, we do not have timers, but at each watermark the condition if 
the NFA is empty is checked and if yes, then the state is cleared up. In 
processing time, it is true that we are always waiting for the "next" element 
although this is going to change in 1.4.

I am having a look to the code and I will come back as soon as I have 
something. 
Sorry [~matteoferrario29] for the delayed answer but this week has been a bit 
busy.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7630) Allow passing a File or an InputStream to ParameterTool.fromPropertiesFile()

2017-09-15 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-7630:
--

 Summary: Allow passing a File or an InputStream to 
ParameterTool.fromPropertiesFile()
 Key: FLINK-7630
 URL: https://issues.apache.org/jira/browse/FLINK-7630
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-user-ParameterTool-fromPropertiesFile-to-get-resource-file-inside-my-jar-tp15482.html
>From this discussion, it seems that the current functionality of 
>{{ParameterTool.fromPropertiesFile}} is not enough.
It will be more useful if {{ParameterTool.fromPropertiesFile}} can provide more 
kinds of parameter type such as {{File}} and {{InputStream}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4346: [FLINK-7199] [gelly] Graph simplification does not...

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4346


---


[jira] [Commented] (FLINK-7199) Graph simplification does not set parallelism

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167895#comment-16167895
 ] 

ASF GitHub Bot commented on FLINK-7199:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4346


> Graph simplification does not set parallelism
> -
>
> Key: FLINK-7199
> URL: https://issues.apache.org/jira/browse/FLINK-7199
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> The {{Simplify}} parameter should accept and set the parallelism when calling 
> the {{Simplify}} algorithms.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4405: [FLINK-7273] [gelly] Gelly tests with empty graphs

2017-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4405


---


[jira] [Commented] (FLINK-7273) Gelly tests with empty graphs

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167896#comment-16167896
 ] 

ASF GitHub Bot commented on FLINK-7273:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4405


> Gelly tests with empty graphs
> -
>
> Key: FLINK-7273
> URL: https://issues.apache.org/jira/browse/FLINK-7273
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.4.0
>
>
> There exist some tests with empty graphs but the `EmptyGraph` in 
> `AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph` without 
> vertices and test both empty graphs for each algorithm.
> `PageRank` should (optionally?) include zero-degree vertices in the results.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139111867
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167875#comment-16167875
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139149693
  
--- Diff: 
flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
 ---
@@ -49,6 +51,8 @@ object WordCount {
 
 val params: ParameterTool = ParameterTool.fromArgs(args)
 
+val t = Types.of[Either[String, Nothing]]
--- End diff --

What's the purpose of this change?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167873#comment-16167873
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139112514
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167878#comment-16167878
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139112027
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167877#comment-16167877
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139148230
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167871#comment-16167871
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138918828
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
--- End diff --

`(or would result in an inefficient type)`?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138918828
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
--- End diff --

`(or would result in an inefficient type)`?


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167870#comment-16167870
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138921417
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
--- End diff --

`Returns type information for both primitive byte and {@link 
java.lang.Byte}.` (drop `a` and `wrapped` as for `STRING`. IMO, "wrapped 
java.lang.Byte" would mean that `Byte` is wrapped not that it wraps a `byte`.


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167872#comment-16167872
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139113529
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167876#comment-16167876
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139111867
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167881#comment-16167881
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139109060
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139113529
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139109060
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138919099
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
--- End diff --

wouldn't `where the extraction is not possible` be a case `where type 
information has to be supplied manually`?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139112514
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150623
  
--- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/Types.scala 
---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.api.scala.typeutils._
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.Either
+import _root_.scala.util.Try
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where the 
extraction is not possible
+  * (or inefficient) as well as cases where type information has to be 
supplied manually.
+  *
+  * Depending on the API you are using (e.g. Java API or Table API), there 
might be a more
--- End diff --

Remove this comment?


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167866#comment-16167866
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139109685
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167868#comment-16167868
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138919667
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
--- End diff --

How about `Please note that the Scala API and Table API provide more 
specialized Types classes`, maybe linking to (or providing the full name) of 
these classes?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150559
  
--- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/Types.scala 
---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.api.scala.typeutils._
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.Either
+import _root_.scala.util.Try
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where the 
extraction is not possible
+  * (or inefficient) as well as cases where type information has to be 
supplied manually.
+  *
+  * Depending on the API you are using (e.g. Java API or Table API), there 
might be a more
+  * specialized `Types` class.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
--- End diff --

extend the Java `Types` class?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139149693
  
--- Diff: 
flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
 ---
@@ -49,6 +51,8 @@ object WordCount {
 
 val params: ParameterTool = ParameterTool.fromArgs(args)
 
+val t = Types.of[Either[String, Nothing]]
--- End diff --

What's the purpose of this change?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138919667
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
--- End diff --

How about `Please note that the Scala API and Table API provide more 
specialized Types classes`, maybe linking to (or providing the full name) of 
these classes?


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167880#comment-16167880
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150559
  
--- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/Types.scala 
---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.api.scala.typeutils._
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.Either
+import _root_.scala.util.Try
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where the 
extraction is not possible
+  * (or inefficient) as well as cases where type information has to be 
supplied manually.
+  *
+  * Depending on the API you are using (e.g. Java API or Table API), there 
might be a more
+  * specialized `Types` class.
+  *
+  * Scala macros allow to determine type information of classes and type 
parameters. You can
+  * use [[Types.of]] to let type information be determined automatically.
+  */
+@PublicEvolving
+object Types {
--- End diff --

extend the Java `Types` class?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139148230
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139112027
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -20,60 +20,123 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.types.Row
 
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
--- End diff --

Not sure if that's possible, but could the `Types` object extend the 
`JTypes` class such that we don't have to implement all methods again and 
forward the calls?


---


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138921417
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
--- End diff --

`Returns type information for both primitive byte and {@link 
java.lang.Byte}.` (drop `a` and `wrapped` as for `STRING`. IMO, "wrapped 
java.lang.Byte" would mean that `Byte` is wrapped not that it wraps a `byte`.


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167867#comment-16167867
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -20,60 +20,123 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.types.Row
 
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
--- End diff --

Not sure if that's possible, but could the `Types` object extend the 
`JTypes` class such that we don't have to implement all methods again and 
forward the calls?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167874#comment-16167874
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139113352
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167869#comment-16167869
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139150623
  
--- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/Types.scala 
---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => 
JTypes}
+import org.apache.flink.api.scala.typeutils._
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.util.Either
+import _root_.scala.util.Try
+
+/**
+  * This class gives access to the type information of the most common 
Scala types for which Flink
+  * has built-in serializers and comparators.
+  *
+  * This class contains types of 
[[org.apache.flink.api.common.typeinfo.Types]] and adds
+  * types for Scala specific classes (such as [[Unit]] or case classes).
+  *
+  * In many cases, Flink tries to analyze generic signatures of functions 
to determine return
+  * types automatically. This class is intended for cases where the 
extraction is not possible
+  * (or inefficient) as well as cases where type information has to be 
supplied manually.
+  *
+  * Depending on the API you are using (e.g. Java API or Table API), there 
might be a more
--- End diff --

Remove this comment?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139113352
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167879#comment-16167879
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r138919099
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
--- End diff --

wouldn't `where the extraction is not possible` be a case `where type 
information has to be supplied manually`?


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r139109685
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...

2017-09-15 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4673
  
Hi @dawidwys , Thanks for the tip, that's what I tried to add. Sorry for 
the missing times().


---


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167804#comment-16167804
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thanks! This doesn't help us with our situation, though.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...

2017-09-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thanks! This doesn't help us with our situation, though.


---


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167800#comment-16167800
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thansk, Aljoscha 
Probably I've forgotten to run `clean`. 
The method `iterate$default$3` is a method which automatically created by 
Scala for calculation of default value for parameter `keepPartitioning`. I 
tried different ways to exclude it but it didn't help. Anyway it should be 
somehow tracked by japicmp so I created the issue there
https://github.com/siom79/japicmp/issues/176


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...

2017-09-15 Thread mlipkovich
Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
Thansk, Aljoscha 
Probably I've forgotten to run `clean`. 
The method `iterate$default$3` is a method which automatically created by 
Scala for calculation of default value for parameter `keepPartitioning`. I 
tried different ways to exclude it but it didn't help. Anyway it should be 
somehow tracked by japicmp so I created the issue there
https://github.com/siom79/japicmp/issues/176


---


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167791#comment-16167791
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I have submitted the updates.


---


[jira] [Assigned] (FLINK-4498) Better Cassandra sink documentation

2017-09-15 Thread Michael Fong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Fong reassigned FLINK-4498:
---

Assignee: Michael Fong

> Better Cassandra sink documentation
> ---
>
> Key: FLINK-4498
> URL: https://issues.apache.org/jira/browse/FLINK-4498
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector, Documentation
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The Cassandra sink documentation is somewhat muddled and could be improved.  
> For instance, the fact that is only supports tuples and POJO's that use 
> DataStax Mapper annotations is only mentioned in passing, and it is not clear 
> that the reference to tuples only applies to Flink Java tuples and not Scala 
> tuples.  
> The documentation also does not mention that setQuery() is only necessary for 
> tuple streams. 
> The explanation of the write ahead log could use some cleaning up to clarify 
> when it is appropriate to use, ideally with an example.  Maybe this would be 
> best as a blog post to expand on the type of non-deterministic streams this 
> applies to.
> It would also be useful to mention that tuple elements will be mapped to 
> Cassandra columns using the Datastax Java driver's default encoders, which 
> are somewhat limited (e.g. to write to a blob column the type in the tuple 
> must be a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-15 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
@aljoscha yes, i review the code this day, it will jude each window whether 
late , so the previous method i use will counts more lost data  than the actual 
situation , i have fix the error and re-push, please help me review the code 
again, thanks.


---


[jira] [Created] (FLINK-7629) Scala stream aggregations should support nested field expressions

2017-09-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-7629:
--

 Summary: Scala stream aggregations should support nested field 
expressions
 Key: FLINK-7629
 URL: https://issues.apache.org/jira/browse/FLINK-7629
 Project: Flink
  Issue Type: Bug
  Components: Scala API, Streaming
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor
 Fix For: 1.4.0


In the Scala API, {{KeyedStream.maxBy}} and similar methods currently only work 
with a field name, and not with nested field expressions, such as 
"fieldA.fieldX". (This contradicts their documentation.)

The reason for this is that the string overload of {{KeyedStream.aggregate}} 
uses {{fieldNames2Indices}} and then calls the integer overload. Instead, it 
should create a {{SumAggregator}} or {{ComparableAggregator}} directly, as the 
integer overload does (and as the Java API does). The ctors of 
{{SumAggregator}} or {{ComparableAggregator}} will call 
{{FieldAccessorFactory.getAccessor}}, which will correctly handle a nested 
field expression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167685#comment-16167685
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
A local build of `mvn clean verify` fails for me because the japicmp plugin 
is complaining. The relevant section of the diff output from japicmp is
```
***! MODIFIED CLASS: PUBLIC org.apache.flink.streaming.api.scala.DataStream 
 (not serializable)
---! REMOVED METHOD: PUBLIC(-) boolean iterate$default$3()
+++  NEW METHOD: PUBLIC(+) 
org.apache.flink.streaming.api.scala.DataStream setMaxParallelism(int)
***  MODIFIED ANNOTATION: scala.reflect.ScalaSignature
```

i.e. it's complaining about `iterate$default$3()`. The problem seems to be 
that Scala will generate some obfuscated method name for the `iterate()` method 
and the `@PublicEvolving` annotation is not properly applied to that. I tried 
playing around with the japicmp config in the root pom file but to no avail. 
Maybe you can find something that works.

What I added is this:
``` 
org.apache.flink.streaming.api.scala.DataStream#iterate$default$3()
```

but it seems that doesn't work.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream 

[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...

2017-09-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
A local build of `mvn clean verify` fails for me because the japicmp plugin 
is complaining. The relevant section of the diff output from japicmp is
```
***! MODIFIED CLASS: PUBLIC org.apache.flink.streaming.api.scala.DataStream 
 (not serializable)
---! REMOVED METHOD: PUBLIC(-) boolean iterate$default$3()
+++  NEW METHOD: PUBLIC(+) 
org.apache.flink.streaming.api.scala.DataStream setMaxParallelism(int)
***  MODIFIED ANNOTATION: scala.reflect.ScalaSignature
```

i.e. it's complaining about `iterate$default$3()`. The problem seems to be 
that Scala will generate some obfuscated method name for the `iterate()` method 
and the `@PublicEvolving` annotation is not properly applied to that. I tried 
playing around with the japicmp config in the root pom file but to no avail. 
Maybe you can find something that works.

What I added is this:
``` 
org.apache.flink.streaming.api.scala.DataStream#iterate$default$3()
```

but it seems that doesn't work.


---


[jira] [Closed] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-4796.
---
Resolution: Won't Fix

Introducing a new SinkInterface will make the workaround of the hybrid producer 
unnecessary.

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-4796:
-

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7563) Fix watermark semantics in CEP operators

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167644#comment-16167644
 ] 

ASF GitHub Bot commented on FLINK-7563:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4632
  
Hi @yestinchen,
Thanks for working on this issue. Unfortunately your changes do not fix the 
semantics. Have a look at this failing test:

@Test
public void testCEPWatermarkSemantic() throws Exception {
Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
Event endEvent = new Event(42, "end", 1.0);

SelectCepOperator> 
operator = getKeyedCepOperatorWithComparator(false);

try (OneInputStreamOperatorTestHarness> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
harness.open();

harness.processWatermark(0L);
verifyWatermark(harness.getOutput().poll(), 0L);

harness.processElement(new StreamRecord<>(startEvent, 
1L));
harness.processElement(new StreamRecord<>(middleEvent, 
2L));
harness.processElement(new StreamRecord<>(endEvent, 
3L));

assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyNFA(42));

harness.processWatermark(3L);
verifyWatermark(harness.getOutput().poll(), 3L);

assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyNFA(42));

harness.processWatermark(4L);
assertTrue(!operator.hasNonEmptyPQ(42));

verifyPattern(harness.getOutput().poll(), startEvent, 
middleEvent, endEvent);
verifyWatermark(harness.getOutput().poll(), 4L);
}
}

To fix it you also have to fix the condition in 
`AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:

while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= 
timerService.currentWatermark()) {

it should be 

while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < 
timerService.currentWatermark()) {


> Fix watermark semantics in CEP operators
> 
>
> Key: FLINK-7563
> URL: https://issues.apache.org/jira/browse/FLINK-7563
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Aljoscha Krettek
>Assignee: Yueting Chen
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> See 
> https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E
>  for reference.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167643#comment-16167643
 ] 

Aljoscha Krettek commented on FLINK-7589:
-

I don't think we want to do that, no. However, we have been thinking of 
including a S3AFileSystem that has all dependencies shaded away so that people 
can use that without having to fight with any of the Hadoop dependencies.

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...

2017-09-15 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4632
  
Hi @yestinchen,
Thanks for working on this issue. Unfortunately your changes do not fix the 
semantics. Have a look at this failing test:

@Test
public void testCEPWatermarkSemantic() throws Exception {
Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
Event endEvent = new Event(42, "end", 1.0);

SelectCepOperator> 
operator = getKeyedCepOperatorWithComparator(false);

try (OneInputStreamOperatorTestHarness> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
harness.open();

harness.processWatermark(0L);
verifyWatermark(harness.getOutput().poll(), 0L);

harness.processElement(new StreamRecord<>(startEvent, 
1L));
harness.processElement(new StreamRecord<>(middleEvent, 
2L));
harness.processElement(new StreamRecord<>(endEvent, 
3L));

assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyNFA(42));

harness.processWatermark(3L);
verifyWatermark(harness.getOutput().poll(), 3L);

assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyNFA(42));

harness.processWatermark(4L);
assertTrue(!operator.hasNonEmptyPQ(42));

verifyPattern(harness.getOutput().poll(), startEvent, 
middleEvent, endEvent);
verifyWatermark(harness.getOutput().poll(), 4L);
}
}

To fix it you also have to fix the condition in 
`AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:

while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= 
timerService.currentWatermark()) {

it should be 

while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < 
timerService.currentWatermark()) {


---


[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata

2017-09-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167638#comment-16167638
 ] 

Aljoscha Krettek commented on FLINK-4814:
-

Yes, those are things I'm aware of and I also find them quite annoying. 

> Remove extra storage location for externalized checkpoint metadata
> --
>
> Key: FLINK-4814
> URL: https://issues.apache.org/jira/browse/FLINK-4814
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Follow up for FLINK-4512.
> Store checkpoint meta data in checkpoint directory.  That makes it simpler 
> for users to track and clean up checkpoints manually, if they want to retain 
> externalized checkpoints across cancellations and terminal failures.
> Every state backend needs to be able to provide a storage location for the 
> checkpoint metadata. The memory state backend would hence not work with 
> externalized checkpoints, unless one sets explicitly a parameter 
> `setExternalizedCheckpointsLocation(uri)`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-15 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167635#comment-16167635
 ] 

Aljoscha Krettek commented on FLINK-7606:
-

How do you mean? They could get the timed-out partially matched sequences and 
then all state for that key (NFA, element queue) should be cleared. Otherwise 
you will also have leaks in RocksDB.

If the memory/heap backend is working correctly (as I'm assuming for the 
moment) then this is not a problem of leaking memory but of leaking state, i.e. 
state that is never being cleaned up. If this is the case we'll have the same 
problem on any state backend, it might just be the case that we will see 
problems later in RocksDB because we have more space to work with.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...

2017-09-15 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4673
  
Hi @yestinchen ,
thanks for spotting the mistake. Good catch. However the test you provided 
does not prove the fix. It passes with and without the fix. Please correct it. 
You can e.g. extend the Pattern like this:

Pattern, ?> pattern =
Pattern.>begin("start", 
AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition>() {
@Override
public boolean filter(Tuple2 
rec) throws Exception {
return rec.f1.equals("a");
}
}).times(2);

Then the expected output would be:

expected = "(1,a)\n(3,a)";


---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4665
  
@Aitozi I meant that the place where you're currently counting dropped 
elements will not yield a correct count because one element might be in several 
windows. The place were we side-output late data is also the place to count and 
here we can decide if we want to count late data as dropped if we also 
side-output it. I think it's a question of personal preference.


---


[jira] [Commented] (FLINK-7394) Implement basic InputChannel for credit-based logic

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167593#comment-16167593
 ] 

ASF GitHub Bot commented on FLINK-7394:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4499#discussion_r138916859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -72,6 +79,21 @@
 */
private int expectedSequenceNumber = 0;
 
+   /** The initial number of exclusive buffers assigned to this channel. */
+   private int initialCredit;
--- End diff --

Do we need this variable? In the current PR, it is not read anywhere.


> Implement basic InputChannel for credit-based logic
> ---
>
> Key: FLINK-7394
> URL: https://issues.apache.org/jira/browse/FLINK-7394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control. 
> The basic works are:
> * Propose the {{BufferListener}} interface for notifying buffer availability 
> and buffer destroyed.
> * {{RemoteInputChannel}} implements {{BufferRecycler}} interface to manage 
> the exclusive buffers itself.
> * {{RemoteInputChannel}} implements {{BufferListener}} interface to be 
> notified repeatedly .
> * {{RemoteInputChannel}} maintains and notifies of unannounced credit.
> * {{RemoteInputChannel}} maintains current sender backlog to trigger requests 
> of floating buffers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >