[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512181#comment-15512181
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
I agree with @delding here. 

>* Method that does a batch call on Deletes, Gets, Puts, Increments and 
Appends.
   * The ordering of execution of the actions is not defined. Meaning if 
you do a Put and a
   * Get in the same {@link #batch} call, you will not necessarily be
   * guaranteed that the Get returns what the Put had put.

is the javadoc for batch API.  Anyway here we don't do get operation but 
still the order of execution among mutations is not guarenteed.
Regarding 

>  storing input records in a state backend and flushing to HBase upon 
receiving a checkpoint barrier.

Where is this being done in Flink. Just for understanding. 


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-21 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
I agree with @delding here. 

>* Method that does a batch call on Deletes, Gets, Puts, Increments and 
Appends.
   * The ordering of execution of the actions is not defined. Meaning if 
you do a Put and a
   * Get in the same {@link #batch} call, you will not necessarily be
   * guaranteed that the Get returns what the Put had put.

is the javadoc for batch API.  Anyway here we don't do get operation but 
still the order of execution among mutations is not guarenteed.
Regarding 

>  storing input records in a state backend and flushing to HBase upon 
receiving a checkpoint barrier.

Where is this being done in Flink. Just for understanding. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512173#comment-15512173
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
` #2330 updates the version of the batch TableInputFormat to HBase 1.1.2. I 
think we should use the same version here.`
Valid point. But is it possible to upgrade to even newer version like 1.1.6 
or the branch1.2 series? 


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-21 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
` #2330 updates the version of the batch TableInputFormat to HBase 1.1.2. I 
think we should use the same version here.`
Valid point. But is it possible to upgrade to even newer version like 1.1.6 
or the branch1.2 series? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-21 Thread delding
Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
I can change back to HBase 1.1.2. The reason of using 2.0.0-SNAPSHOT is 
because this bug (https://issues.apache.org/jira/browse/HBASE-14963 ). It's 
only fixed for version 1.3.0+ and 2.0.0 and my example has problem of running 
because of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15511953#comment-15511953
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
I can change back to HBase 1.1.2. The reason of using 2.0.0-SNAPSHOT is 
because this bug (https://issues.apache.org/jira/browse/HBASE-14963 ). It's 
only fixed for version 1.3.0+ and 2.0.0 and my example has problem of running 
because of it.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15511945#comment-15511945
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi @fhueske , in HBase writes to a single row have ACID guarantee. The 
exactly once semantic can be implemented the way CassandraSink did, storing 
input records in a state backend and flushing to HBase upon receiving a 
checkpoint barrier. One thing that might be a concern is the order of execution 
of these writes are not defined when making such a batch call. In other words, 
a write of an earlier record could be observed later, but this could be also 
true even sending each write immediately. So what do you think if implement 
fault tolerance the same way as Cassandra Sink? 


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-21 Thread delding
Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi @fhueske , in HBase writes to a single row have ACID guarantee. The 
exactly once semantic can be implemented the way CassandraSink did, storing 
input records in a state backend and flushing to HBase upon receiving a 
checkpoint barrier. One thing that might be a concern is the order of execution 
of these writes are not defined when making such a batch call. In other words, 
a write of an earlier record could be observed later, but this could be also 
true even sending each write immediately. So what do you think if implement 
fault tolerance the same way as Cassandra Sink? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2532: [FLINK-4494] Expose the TimeServiceProvider from t...

2016-09-21 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2532

[FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator.

This is a PR for both [FLINK-4496] and [FLINK-4494].
R: @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink timeprovider_exposing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2532.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2532


commit e1522ace235af2a524bce96e180a7d2729c5e6a8
Author: kl0u 
Date:   2016-08-25T15:38:49Z

[FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead 
of a Runnable.

commit 2ebdbdafa41d04355cce4b27954c53013d0bc4ba
Author: kl0u 
Date:   2016-09-21T15:08:29Z

Updating the PR.

commit 02b120f6bec36c88c9dae9f20fa4b7d74b42cdaf
Author: kl0u 
Date:   2016-09-20T12:45:01Z

[FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4650) Frequent task manager disconnects from JobManager

2016-09-21 Thread Nagarjun Guraja (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15511311#comment-15511311
 ] 

Nagarjun Guraja commented on FLINK-4650:


[~StephanEwen] I haven't spent lot of time debugging it on 1.2.SNAPSHOT, but 
the stack traces are similar to the one below: (The node was reachable and no 
issues with network connectivity)

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'titus-248496-worker-0-2/100.82.8.187:56858'. This might indicate that the 
remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Do you want us to look for any specific log messages to see what was the root 
cause? 

> Frequent task manager disconnects from JobManager
> -
>
> Key: FLINK-4650
> URL: https://issues.apache.org/jira/browse/FLINK-4650
> Project: Flink
>  Issue Type: Bug
>Reporter: Nagarjun Guraja
>
> Not sure of the exact reason but we observe more frequent task manager 
> disconnects while using 1.2 snapshot build as compared to 1.1.2 release build



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
!Screen Shot 2016-09-20 at 2.49.14 PM.png|align=left, vspace=5! 


2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.
!Screen Shot 2016-09-20 at 2.49.32 PM.png!


3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
!Screen Shot 2016-09-20 at 2.49.14 PM.png|align=left, vspace=5! 

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.
!Screen Shot 2016-09-20 at 2.49.32 PM.png!

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
> Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 
> 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
!Screen Shot 2016-09-20 at 2.49.14 PM.png! 

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.
!Screen Shot 2016-09-20 at 2.49.32 PM.png!

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
!Screen Shot 2016-09-20 at 2.49.14 PM.png! 

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
> Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 
> 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
!Screen Shot 2016-09-20 at 2.49.14 PM.png! 

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
 !
2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
> Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 
> 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Attachment: Screen Shot 2016-09-20 at 2.49.32 PM.png
Screen Shot 2016-09-20 at 2.49.14 PM.png

> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
> Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 
> 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems HadoopFileSystem.java never invoke close() method 
> on fs object upon failure, but the FileSystem may be re-initialized every 
> time the job gets restarted.
> A few evidence I observed:
> 1. When I set the connection pool limit to 128, and below commands shows 128 
> connections are stuck in CLOSE_WAIT state.
> 2. task manager logs indicates that state backend file system consistently 
> getting initialized upon job restarting.
> 3. Log indicates there is NPE during cleanning up of stream task which was 
> caused by “Timeout waiting for connection from pool” exception when trying to 
> create a directory in S3 bucket.
> 2016-09-02 08:17:50,886 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
> stream task
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
> at java.lang.Thread.run(Thread.java:745)
> 4.It appears StreamTask from invoking checkpointing operation, to handling 
> failure, there is no logic associated with closing Hadoop File System object 
> (which internally includes S3 aws client object), which resides in 
> HadoopFileSystem.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.
 !
2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
> Attachments: Screen Shot 2016-09-20 at 2.49.14 PM.png, Screen Shot 
> 2016-09-20 at 2.49.32 PM.png
>
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)

4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.

I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.

A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems 

[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.
This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems HadoopFileSystem.java never invoke close() method 
> on fs object upon failure, but the FileSystem may be re-initialized every 
> time the job gets restarted.
> A few evidence I observed:
> 1. When I set the connection pool limit to 128, and below commands shows 128 
> connections are stuck in CLOSE_WAIT state.
> 2. task manager logs indicates that state backend file system consistently 
> getting initialized upon job restarting.
> 3. Log indicates there is NPE during cleanning up of stream task which was 
> caused by “Timeout waiting for connection from pool” exception when trying to 
> create a directory in S3 bucket.
> 2016-09-02 08:17:50,886 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
> stream task
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
> at java.lang.Thread.run(Thread.java:745)
> 4.It appears StreamTask from invoking checkpointing operation, to handling 
> failure, there is no logic associated with closing Hadoop File System object 
> (which internally includes S3 aws client object), which resides in 
> HadoopFileSystem.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:

Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.
This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> ---
>
> Key: FLINK-4660
> URL: https://issues.apache.org/jira/browse/FLINK-4660
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Zhenzhong Xu
>Priority: Critical
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems HadoopFileSystem.java 

[jira] [Created] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4660:
---

 Summary: HadoopFileSystem (with S3A) may leak connections, which 
cause job to stuck in a restarting loop
 Key: FLINK-4660
 URL: https://issues.apache.org/jira/browse/FLINK-4660
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Zhenzhong Xu
Priority: Critical






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4659) Potential resource leak due to unclosed InputStream in SecurityContext#populateSystemSecurityProperties()

2016-09-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4659:
-

 Summary: Potential resource leak due to unclosed InputStream in 
SecurityContext#populateSystemSecurityProperties()
 Key: FLINK-4659
 URL: https://issues.apache.org/jira/browse/FLINK-4659
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
try {
Path jaasConfPath = 
Files.createTempFile(JAAS_CONF_FILENAME, "");
InputStream jaasConfStream = 
SecurityContext.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
Files.copy(jaasConfStream, jaasConfPath, 
StandardCopyOption.REPLACE_EXISTING);
jaasConfFile = jaasConfPath.toFile();
jaasConfFile.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException("SASL auth is enabled for ZK 
but unable to " +
{code}
jaasConfStream should be closed in finally block.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510703#comment-15510703
 ] 

ASF GitHub Bot commented on FLINK-4248:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2303
  
I still think that methods should be named after what they do (and there is 
nothing numeric about their behavior) and not in which context they are 
supposed to be called. The error messages are numeric due to the original 
context of the code which has been moved into separate methods. That context is 
no longer present in these methods. Anyway, I don't want to start a 
bikeshedding discussion about the names of internal utility methods and stop at 
this point ;-)


> CsvTableSource does not support reading SqlTimeTypeInfo types
> -
>
> Key: FLINK-4248
> URL: https://issues.apache.org/jira/browse/FLINK-4248
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The Table API's {{CsvTableSource}} does not support to read all Table API 
> supported data types. For example, it is not possible to read 
> {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2303: [FLINK-4248] [core] [table] CsvTableSource does not suppo...

2016-09-21 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2303
  
I still think that methods should be named after what they do (and there is 
nothing numeric about their behavior) and not in which context they are 
supposed to be called. The error messages are numeric due to the original 
context of the code which has been moved into separate methods. That context is 
no longer present in these methods. Anyway, I don't want to start a 
bikeshedding discussion about the names of internal utility methods and stop at 
this point ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2479: [FLINK-4537] [cluster management] ResourceManager registr...

2016-09-21 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2479
  
Thank you for the pull request! I've rebased the changes and will merge 
them to the master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4537) ResourceManager registration with JobManager

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510670#comment-15510670
 ] 

ASF GitHub Bot commented on FLINK-4537:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2479
  
Thank you for the pull request! I've rebased the changes and will merge 
them to the master.


> ResourceManager registration with JobManager
> 
>
> Key: FLINK-4537
> URL: https://issues.apache.org/jira/browse/FLINK-4537
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: zhangjing
>
> The ResourceManager keeps tracks of all JobManager's which execute Jobs. When 
> a new JobManager registered, its leadership status is checked through the 
> HighAvailabilityServices. It will then be registered at the ResourceManager 
> using the {{JobID}} provided with the initial registration message.
> ResourceManager should use JobID and LeaderSessionID(notified by 
> HighAvailabilityServices) to identify a a session to JobMaster.
> When JobManager's register at ResourceManager, it takes the following 2 input 
> parameters :
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by JobMaster who send the registration
> 2. JobMasterRegistration: contain address, JobID
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid JobMaster at the giving address by connecting 
> to the address. Reject the registration from invalid address.(Hidden in the 
> connect logic)
> 3. Keep JobID and JobMasterGateway mapping relationships.
> 4. Start a JobMasterLeaderListener at the given JobID to listen to the 
> leadership of the specified JobMaster.
> 5. Send registration successful ack to the jobMaster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4535) ResourceManager registration with TaskExecutor

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510667#comment-15510667
 ] 

ASF GitHub Bot commented on FLINK-4535:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2451
  
Thank you for the pull request! I've rebased the changes and will merge 
them to the master.


> ResourceManager registration with TaskExecutor
> --
>
> Key: FLINK-4535
> URL: https://issues.apache.org/jira/browse/FLINK-4535
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> When TaskExecutor register at ResourceManager, it takes the following 3 input 
> parameters:
> 1. resourceManagerLeaderId:  the fencing token for the ResourceManager leader 
> which is kept by taskExecutor who send the registration
> 2.  taskExecutorAddress: the address of taskExecutor
> 3. resourceID: The resource ID of the TaskExecutor that registers
> ResourceManager need to process the registration event based on the following 
> steps:
> 1. Check whether input resourceManagerLeaderId is as same as the current 
> leadershipSessionId of resourceManager. If not, it means that maybe two or 
> more resourceManager exists at the same time, and current resourceManager is 
> not the proper rm. so it  rejects or ignores the registration.
> 2. Check whether exists a valid taskExecutor at the giving address by 
> connecting to the address. Reject the registration from invalid address.
> 3. Check whether it is a duplicate registration by input resourceId, reject 
> the registration
> 4. Keep resourceID and taskExecutorGateway mapping relationships, And 
> optionally keep resourceID and container mapping relationships in yarn mode.
> 5. Create the connection between resourceManager and taskExecutor, and ensure 
> its healthy based on heartbeat rpc calls between rm and tm ?
> 6. Send registration successful ack to the taskExecutor.
> Discussion:
> Maybe we need import errorCode or several registration decline subclass to 
> distinguish the different causes of decline registration. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2451: [FLINK-4535] [cluster management] resourceManager process...

2016-09-21 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2451
  
Thank you for the pull request! I've rebased the changes and will merge 
them to the master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true

2016-09-21 Thread Michael Wong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510589#comment-15510589
 ] 

Michael Wong commented on FLINK-4506:
-

Ok, but let's forget about what was there before for the moment. What is the 
desired behavior? What is the most likely setting people would want? My bet is 
allowNullValues=true.

> CsvOutputFormat defaults allowNullValues to false, even though doc and 
> declaration says true
> 
>
> Key: FLINK-4506
> URL: https://issues.apache.org/jira/browse/FLINK-4506
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Documentation
>Reporter: Michael Wong
>Assignee: Kirill Morozov
>Priority: Minor
>
> In the constructor, it has this
> {code}
> this.allowNullValues = false;
> {code}
> But in the setAllowNullValues() method, the doc says the allowNullValues is 
> true by default. Also, in the declaration of allowNullValues, the value is 
> set to true. It probably makes the most sense to change the constructor.
> {code}
>   /**
>* Configures the format to either allow null values (writing an empty 
> field),
>* or to throw an exception when encountering a null field.
>* 
>* by default, null values are allowed.
>*
>* @param allowNulls Flag to indicate whether the output format should 
> accept null values.
>*/
>   public void setAllowNullValues(boolean allowNulls) {
>   this.allowNullValues = allowNulls;
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2531: [FLINK-4658] [rpc] Allow RpcService to execute Run...

2016-09-21 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4658.

Resolution: Fixed

Fixed via b5f6a06b01436a8b0e7243e3f8af7444f8058868

> Allow RpcService to execute Callables in the RpcService executor
> 
>
> Key: FLINK-4658
> URL: https://issues.apache.org/jira/browse/FLINK-4658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to execute operations outside of the main executor thread, it would 
> be convenient to add a {{execute(Callable)}} method to the {{RpcService}}. 
> The {{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
> {{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510460#comment-15510460
 ] 

ASF GitHub Bot commented on FLINK-4658:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2531


> Allow RpcService to execute Callables in the RpcService executor
> 
>
> Key: FLINK-4658
> URL: https://issues.apache.org/jira/browse/FLINK-4658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to execute operations outside of the main executor thread, it would 
> be convenient to add a {{execute(Callable)}} method to the {{RpcService}}. 
> The {{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
> {{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510452#comment-15510452
 ] 

ASF GitHub Bot commented on FLINK-4658:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2531
  
Thanks for the review @mxm. Will merge it.


> Allow RpcService to execute Callables in the RpcService executor
> 
>
> Key: FLINK-4658
> URL: https://issues.apache.org/jira/browse/FLINK-4658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to execute operations outside of the main executor thread, it would 
> be convenient to add a {{execute(Callable)}} method to the {{RpcService}}. 
> The {{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
> {{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2531: [FLINK-4658] [rpc] Allow RpcService to execute Runnables ...

2016-09-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2531
  
Thanks for the review @mxm. Will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2531: [FLINK-4658] [rpc] Allow RpcService to execute Runnables ...

2016-09-21 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2531
  
+1 Looks good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510440#comment-15510440
 ] 

ASF GitHub Bot commented on FLINK-4658:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2531
  
+1 Looks good!


> Allow RpcService to execute Callables in the RpcService executor
> 
>
> Key: FLINK-4658
> URL: https://issues.apache.org/jira/browse/FLINK-4658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to execute operations outside of the main executor thread, it would 
> be convenient to add a {{execute(Callable)}} method to the {{RpcService}}. 
> The {{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
> {{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4656) Port existing code to use Flink's future abstraction

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510429#comment-15510429
 ] 

ASF GitHub Bot commented on FLINK-4656:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2530


> Port existing code to use Flink's future abstraction
> 
>
> Key: FLINK-4656
> URL: https://issues.apache.org/jira/browse/FLINK-4656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Port existing code to use Flink's future abstraction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4656) Port existing code to use Flink's future abstraction

2016-09-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4656.

Resolution: Fixed

Fixed via 31a091b930178bf2aec2881ee273fe0e5e17464d

> Port existing code to use Flink's future abstraction
> 
>
> Key: FLINK-4656
> URL: https://issues.apache.org/jira/browse/FLINK-4656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Port existing code to use Flink's future abstraction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2530: [FLINK-4656] [rpc] Port the existing code to Flink...

2016-09-21 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2530


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4656) Port existing code to use Flink's future abstraction

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510423#comment-15510423
 ] 

ASF GitHub Bot commented on FLINK-4656:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2530
  
Will merge the PR since it's prone to merge conflicts.


> Port existing code to use Flink's future abstraction
> 
>
> Key: FLINK-4656
> URL: https://issues.apache.org/jira/browse/FLINK-4656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Port existing code to use Flink's future abstraction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2530: [FLINK-4656] [rpc] Port the existing code to Flink's own ...

2016-09-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2530
  
Will merge the PR since it's prone to merge conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2531: [FLINK-4658] [rpc] Allow RpcService to execute Run...

2016-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2531

[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in 
its executor

The PR is based on #2530.

The newly introduced methods are `void RpcService.execute(Runnable)` and 
`Future RpcService.execute(Callable)`. Both methods allow to run code in 
the `Executor` of the `RpcService`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendRpcService

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2531


commit cc7cdd75d23f2d4d201c0458f4543be1bc7e09f0
Author: Till Rohrmann 
Date:   2016-09-21T15:26:21Z

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

commit 2af2ada199e9473f571b050bd2a2752e06236efb
Author: Till Rohrmann 
Date:   2016-09-21T16:16:27Z

[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in 
its executor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510421#comment-15510421
 ] 

ASF GitHub Bot commented on FLINK-4658:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2531

[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in 
its executor

The PR is based on #2530.

The newly introduced methods are `void RpcService.execute(Runnable)` and 
`Future RpcService.execute(Callable)`. Both methods allow to run code in 
the `Executor` of the `RpcService`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendRpcService

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2531


commit cc7cdd75d23f2d4d201c0458f4543be1bc7e09f0
Author: Till Rohrmann 
Date:   2016-09-21T15:26:21Z

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

commit 2af2ada199e9473f571b050bd2a2752e06236efb
Author: Till Rohrmann 
Date:   2016-09-21T16:16:27Z

[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in 
its executor




> Allow RpcService to execute Callables in the RpcService executor
> 
>
> Key: FLINK-4658
> URL: https://issues.apache.org/jira/browse/FLINK-4658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> In order to execute operations outside of the main executor thread, it would 
> be convenient to add a {{execute(Callable)}} method to the {{RpcService}}. 
> The {{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
> {{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-21 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509946#comment-15509946
 ] 

Anton Mushin edited comment on FLINK-4564 at 9/21/16 4:10 PM:
--

Not all clear.
How should look like {{MetricGroup#getMetricIdentifier()}} call in this case? 
If we use single reporter for other reporters and know their indexes, we must 
know name or index where we {{MetricGroup#getMetricIdentifier()}} call. and 
then in MetricRegistry need add method for getting indexes of reporters via 
names. 
or all this assumption is not correct?


was (Author: anmu):
Not all clear.
How should look like MetricGroup#getMetricIdentifier() call in this case? 
If we use single reporter for other reporters and know their indexes, we must 
know name or index where we MetricGroup#getMetricIdentifier() call. and then in 
MetricRegistry need add method for getting indexes of reporters via names. 
or all this assumption is not correct?

> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4645) Hard to register Kryo Serializers due to generics

2016-09-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4645.
-
Resolution: Fixed

Fixed via 82ef021cb8f77635bcec61d49eedce1ddefd1e48

> Hard to register Kryo Serializers due to generics
> -
>
> Key: FLINK-4645
> URL: https://issues.apache.org/jira/browse/FLINK-4645
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> It currently does not work to do this:
> {code}
> env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class);
> {code}
> instead on needs to do that:
> {code}
> env.registerTypeWithKryoSerializer(TreeMultimap.class, (Class Serializer>) JavaSerializer.class);
> {code}
> The fix would be to change the signature of the environment method from
> {code}
> public void registerTypeWithKryoSerializer(Class type, Class Serializer> serializerClass)
> {code}
> to
> {code}
> public void registerTypeWithKryoSerializer(Class type, Class Serializer> serializerClass)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4645) Hard to register Kryo Serializers due to generics

2016-09-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4645.
---

> Hard to register Kryo Serializers due to generics
> -
>
> Key: FLINK-4645
> URL: https://issues.apache.org/jira/browse/FLINK-4645
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> It currently does not work to do this:
> {code}
> env.registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class);
> {code}
> instead on needs to do that:
> {code}
> env.registerTypeWithKryoSerializer(TreeMultimap.class, (Class Serializer>) JavaSerializer.class);
> {code}
> The fix would be to change the signature of the environment method from
> {code}
> public void registerTypeWithKryoSerializer(Class type, Class Serializer> serializerClass)
> {code}
> to
> {code}
> public void registerTypeWithKryoSerializer(Class type, Class Serializer> serializerClass)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2522: Parameterize Flink version in Quickstart bash scri...

2016-09-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2522


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4640) Serialization of the initialValue of a Fold on WindowedStream fails

2016-09-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4640.
---

> Serialization of the initialValue of a Fold on WindowedStream fails
> ---
>
> Key: FLINK-4640
> URL: https://issues.apache.org/jira/browse/FLINK-4640
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Fabian Hueske
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The following program
> {code}
> DataStream> src = env.fromElements(new Tuple2 Long>("a", 1L));
> src
>   .keyBy(1)
>   .timeWindow(Time.minutes(5))
>   .fold(TreeMultimap.create(), new FoldFunction Long>, TreeMultimap>() {
> @Override
> public TreeMultimap fold(
> TreeMultimap topKSoFar, 
> Tuple2 itemCount) throws Exception 
> {
>   String item = itemCount.f0;
>   Long count = itemCount.f1;
>   topKSoFar.put(count, item);
>   if (topKSoFar.keySet().size() > 10) {
> topKSoFar.removeAll(topKSoFar.keySet().first());
>   }
>   return topKSoFar;
> }
> });
> {code}
> throws this exception
> {quote}
> Caused by: java.lang.RuntimeException: Could not add value to folding state.
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
>   at 
> com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
>   at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
>   ... 6 more
> {quote}
> The exception is caused because the initial value was not correctly 
> deserialized and is {{null}}.
> The user reporting this issue said that using the same {{FoldFunction}} on a 
> {{KeyedStream}} (without a window) works fine.
> I tracked the problem down to the serialization of the {{StateDescriptor}}, 
> i.e., the {{writeObject()}} and {{readObject()}} methods. The methods use 
> Flink's TypeSerializers to serialize the default value. In case of the 
> {{TreeMultiMap}} this is the {{KryoSerializer}} which fails to read the 
> serialized data for some reason.
> A quick workaround to solve this issue would be to check if the default value 
> implements {{Serializable}} and use Java Serialization in this case. However, 
> it would be good to track the root cause of this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4640) Serialization of the initialValue of a Fold on WindowedStream fails

2016-09-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4640.
-
Resolution: Fixed

Fixed in
  - 1.1.3 via 52a4440d916fb450c4999f6e1f42f392e247b426
  - 1.2.0 via 4d4eb64be7490672771243147824a70d3d47c501

> Serialization of the initialValue of a Fold on WindowedStream fails
> ---
>
> Key: FLINK-4640
> URL: https://issues.apache.org/jira/browse/FLINK-4640
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Fabian Hueske
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The following program
> {code}
> DataStream> src = env.fromElements(new Tuple2 Long>("a", 1L));
> src
>   .keyBy(1)
>   .timeWindow(Time.minutes(5))
>   .fold(TreeMultimap.create(), new FoldFunction Long>, TreeMultimap>() {
> @Override
> public TreeMultimap fold(
> TreeMultimap topKSoFar, 
> Tuple2 itemCount) throws Exception 
> {
>   String item = itemCount.f0;
>   Long count = itemCount.f1;
>   topKSoFar.put(count, item);
>   if (topKSoFar.keySet().size() > 10) {
> topKSoFar.removeAll(topKSoFar.keySet().first());
>   }
>   return topKSoFar;
> }
> });
> {code}
> throws this exception
> {quote}
> Caused by: java.lang.RuntimeException: Could not add value to folding state.
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
>   at 
> com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
>   at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
>   ... 6 more
> {quote}
> The exception is caused because the initial value was not correctly 
> deserialized and is {{null}}.
> The user reporting this issue said that using the same {{FoldFunction}} on a 
> {{KeyedStream}} (without a window) works fine.
> I tracked the problem down to the serialization of the {{StateDescriptor}}, 
> i.e., the {{writeObject()}} and {{readObject()}} methods. The methods use 
> Flink's TypeSerializers to serialize the default value. In case of the 
> {{TreeMultiMap}} this is the {{KryoSerializer}} which fails to read the 
> serialized data for some reason.
> A quick workaround to solve this issue would be to check if the default value 
> implements {{Serializable}} and use Java Serialization in this case. However, 
> it would be good to track the root cause of this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4658) Allow RpcService to execute Callables in the RpcService executor

2016-09-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4658:


 Summary: Allow RpcService to execute Callables in the RpcService 
executor
 Key: FLINK-4658
 URL: https://issues.apache.org/jira/browse/FLINK-4658
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Till Rohrmann


In order to execute operations outside of the main executor thread, it would be 
convenient to add a {{execute(Callable)}} method to the {{RpcService}}. The 
{{execute}} method runs the given {{Callable}}/{{Runnable}} in its 
{{ExecutionContext}} and, thus, outside of the main thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4651) Re-register processing time timers at the WindowOperator upon recovery.

2016-09-21 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4651:

Fix Version/s: 1.2.0

> Re-register processing time timers at the WindowOperator upon recovery.
> ---
>
> Key: FLINK-4651
> URL: https://issues.apache.org/jira/browse/FLINK-4651
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>  Labels: windows
> Fix For: 1.2.0, 1.1.3
>
>
> Currently the {{WindowOperator}} checkpoints the processing time timers, but 
> upon recovery it does not re-registers them with the {{TimeServiceProvider}}. 
> To actually reprocess them it relies on another element that will come and 
> register a new timer for a future point in time. Although this is a realistic 
> assumption in long running jobs, we can remove this assumption by 
> re-registering the restored timers with the {{TimeServiceProvider}} in the 
> {{open()}} method of the {{WindowOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4656) Port existing code to use Flink's future abstraction

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510296#comment-15510296
 ] 

ASF GitHub Bot commented on FLINK-4656:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2530

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

This pull request removes Scala's `Futures` from all new Flip-6 components 
and replaces them with Flink's own `Future` abstraction.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink useNewFutures

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2530


commit cc7cdd75d23f2d4d201c0458f4543be1bc7e09f0
Author: Till Rohrmann 
Date:   2016-09-21T15:26:21Z

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction




> Port existing code to use Flink's future abstraction
> 
>
> Key: FLINK-4656
> URL: https://issues.apache.org/jira/browse/FLINK-4656
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> Port existing code to use Flink's future abstraction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2016-09-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510313#comment-15510313
 ] 

Aljoscha Krettek commented on FLINK-4616:
-

+1, I think this would be good to have.

> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
> Fix For: 1.2.0, 1.1.3
>
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510349#comment-15510349
 ] 

ASF GitHub Bot commented on FLINK-4248:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2303
  
@fhueske I think we should keep the methods as they currently are. I 
renamed them to "nextNumericString"/"nextNumericStringEndPos" and added a 
explanation what a numeric string is. You are right they don't do much numeric 
at the moment but only the numeric classes will use it. Furthermore, the error 
states and exceptions are numeric. E.g. if we don't do the whitespace checking 
in `nextNumericString` we also need to return the position instead of the 
string.


> CsvTableSource does not support reading SqlTimeTypeInfo types
> -
>
> Key: FLINK-4248
> URL: https://issues.apache.org/jira/browse/FLINK-4248
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The Table API's {{CsvTableSource}} does not support to read all Table API 
> supported data types. For example, it is not possible to read 
> {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2303: [FLINK-4248] [core] [table] CsvTableSource does not suppo...

2016-09-21 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2303
  
@fhueske I think we should keep the methods as they currently are. I 
renamed them to "nextNumericString"/"nextNumericStringEndPos" and added a 
explanation what a numeric string is. You are right they don't do much numeric 
at the moment but only the numeric classes will use it. Furthermore, the error 
states and exceptions are numeric. E.g. if we don't do the whitespace checking 
in `nextNumericString` we also need to return the position instead of the 
string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2530: [FLINK-4656] [rpc] Port the existing code to Flink...

2016-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2530

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

This pull request removes Scala's `Futures` from all new Flip-6 components 
and replaces them with Flink's own `Future` abstraction.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink useNewFutures

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2530


commit cc7cdd75d23f2d4d201c0458f4543be1bc7e09f0
Author: Till Rohrmann 
Date:   2016-09-21T15:26:21Z

[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4641) Support branching CEP patterns

2016-09-21 Thread Frank Dekervel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510291#comment-15510291
 ] 

Frank Dekervel commented on FLINK-4641:
---

i think the second option would made it hard to nest branching patterns ?
will method chaining keep working when doing more complex state transition 
hierarchies? (i think supporting cyclic patterns like the following is out of 
scope for this ticket)

eg (trying to make up a more complex example) suppose that one of the branches 
is two states K and B, and the system can go from one state back to the other, 
and the C state can be followedBy the D state, but if D happens after K, it 
should be next not followedby...

{noformat}
 
. |<---|
.|--> B -- K--|
A -- ||-->D
.|--> C --|

{noformat}

(would maybe only be useful together with an implementation for #3703)

Frank


> Support branching CEP patterns 
> ---
>
> Key: FLINK-4641
> URL: https://issues.apache.org/jira/browse/FLINK-4641
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Till Rohrmann
>
> We should add support for branching CEP patterns to the Pattern API. 
> {code}
> |--> B --|
> ||
> A -- --> D
> ||
> |--> C --|
> {code}
> This feature will require changes to the {{Pattern}} class and the 
> {{NFACompiler}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2434: [FLINK-4496] Refactor the TimeServiceProvider to t...

2016-09-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854335
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -365,19 +364,6 @@ public Long getValue() {
}
}
 
-   /**
--- End diff --

This move also seems unnecessary. The current place in the code might not 
seem ideal but just moving around code does not help when trying to retrace the 
history of code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510265#comment-15510265
 ] 

ASF GitHub Bot commented on FLINK-4496:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854335
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -365,19 +364,6 @@ public Long getValue() {
}
}
 
-   /**
--- End diff --

This move also seems unnecessary. The current place in the code might not 
seem ideal but just moving around code does not help when trying to retrace the 
history of code.


> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> 
>
> Key: FLINK-4496
> URL: https://issues.apache.org/jira/browse/FLINK-4496
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2434: [FLINK-4496] Refactor the TimeServiceProvider to t...

2016-09-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79858930
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+   @Test
+   public void testDefaultTimeProvider() throws InterruptedException {
+   final Object lock = new Object();
+   TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
+   
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+   final List timestamps = new ArrayList<>();
+
+   long start = System.currentTimeMillis();
+   long interval = 50L;
+
+   long noOfTimers = 5;
+   for (int i = 0; i < noOfTimers; i++) {
+   double nextTimer = start + i * interval;
+
+   timeServiceProvider.registerTimer((long) nextTimer, new 
Triggerable() {
+   @Override
+   public void trigger(long timestamp) throws 
Exception {
+   timestamps.add(timestamp);
+   }
+   });
+
+   // add also out-of-order tasks to verify that eventually
+   // they will be executed in the correct order.
+
+   if (i > 0) {
+   timeServiceProvider.registerTimer((long) 
(nextTimer - 10), new Triggerable() {
+   @Override
+   public void trigger(long timestamp) 
throws Exception {
+   timestamps.add(timestamp);
+   }
+   });
+   }
+   }
+
+   Thread.sleep(1000);
--- End diff --

Having a `Thread.sleep()` here is probably problematic when running on 
Travis: it might happen that not all timers fire within 1 second. Also, it 
always adds one second to the runtime of the test.

I think you can do the verification of correct firing order directly in the 
`trigger()` methods. You have an atomic variable outside of the scope of the 
timers that you check and update within the trigger methods. Outside, in the 
test you can use a `OneShotLatch` to wait on success. Once you detect success 
inside the trigger methods you signal that using `OneShotLatch.trigger()`.

   


---
If your project is set up for it, 

[GitHub] flink pull request #2434: [FLINK-4496] Refactor the TimeServiceProvider to t...

2016-09-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854646
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -326,6 +322,9 @@ public Long getValue() {
// stop all timers and threads
if (timerService != null) {
try {
+   if (!timerService.isTerminated()) {
--- End diff --

Should the actual call to `shutdownService()` also be inside the if block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510266#comment-15510266
 ] 

ASF GitHub Bot commented on FLINK-4496:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79858930
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 ---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+   @Test
+   public void testDefaultTimeProvider() throws InterruptedException {
+   final Object lock = new Object();
+   TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
+   
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+   final List timestamps = new ArrayList<>();
+
+   long start = System.currentTimeMillis();
+   long interval = 50L;
+
+   long noOfTimers = 5;
+   for (int i = 0; i < noOfTimers; i++) {
+   double nextTimer = start + i * interval;
+
+   timeServiceProvider.registerTimer((long) nextTimer, new 
Triggerable() {
+   @Override
+   public void trigger(long timestamp) throws 
Exception {
+   timestamps.add(timestamp);
+   }
+   });
+
+   // add also out-of-order tasks to verify that eventually
+   // they will be executed in the correct order.
+
+   if (i > 0) {
+   timeServiceProvider.registerTimer((long) 
(nextTimer - 10), new Triggerable() {
+   @Override
+   public void trigger(long timestamp) 
throws Exception {
+   timestamps.add(timestamp);
+   }
+   });
+   }
+   }
+
+   Thread.sleep(1000);
--- End diff --

Having a `Thread.sleep()` here is probably problematic when running on 
Travis: it might happen that not all timers fire within 1 second. Also, it 
always adds one second to the runtime of the test.

I think you can do the verification of correct firing order directly in the 
`trigger()` methods. You have an atomic variable outside of the scope of the 
timers that you check and update within the 

[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510267#comment-15510267
 ] 

ASF GitHub Bot commented on FLINK-4496:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79853936
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -212,10 +212,6 @@ public void setTimeService(TimeServiceProvider 
timeProvider) {
timerService = timeProvider;
}
 
-   public long getCurrentProcessingTime() {
--- End diff --

Moving this method does not seem necessary. Especially since it's planned 
for removal in FLINK-4494. 


> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> 
>
> Key: FLINK-4496
> URL: https://issues.apache.org/jira/browse/FLINK-4496
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510268#comment-15510268
 ] 

ASF GitHub Bot commented on FLINK-4496:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854449
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -486,6 +472,9 @@ private void disposeAllOperators() {
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
+   if (!timerService.isTerminated()) {
--- End diff --

Should the actual call to `shutdownService()` also be inside the if block?


> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> 
>
> Key: FLINK-4496
> URL: https://issues.apache.org/jira/browse/FLINK-4496
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2434: [FLINK-4496] Refactor the TimeServiceProvider to t...

2016-09-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854449
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -486,6 +472,9 @@ private void disposeAllOperators() {
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
+   if (!timerService.isTerminated()) {
--- End diff --

Should the actual call to `shutdownService()` also be inside the if block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510269#comment-15510269
 ] 

ASF GitHub Bot commented on FLINK-4496:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79854646
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -326,6 +322,9 @@ public Long getValue() {
// stop all timers and threads
if (timerService != null) {
try {
+   if (!timerService.isTerminated()) {
--- End diff --

Should the actual call to `shutdownService()` also be inside the if block?


> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> 
>
> Key: FLINK-4496
> URL: https://issues.apache.org/jira/browse/FLINK-4496
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2434: [FLINK-4496] Refactor the TimeServiceProvider to t...

2016-09-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2434#discussion_r79853936
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -212,10 +212,6 @@ public void setTimeService(TimeServiceProvider 
timeProvider) {
timerService = timeProvider;
}
 
-   public long getCurrentProcessingTime() {
--- End diff --

Moving this method does not seem necessary. Especially since it's planned 
for removal in FLINK-4494. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4241) Cryptic expression parser exceptions

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510252#comment-15510252
 ] 

ASF GitHub Bot commented on FLINK-4241:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2529

[FLINK-4241] [table] Cryptic expression parser exceptions

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR improves the error message of the `ExpressionParser`. I tried my 
best but it is not easy to improve the message as we are using `RegexParsers`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4241

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2529.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2529


commit 7d8d69f8b52983ccf078adfa2a78bf0855ec7cf9
Author: twalthr 
Date:   2016-09-21T15:12:31Z

[FLINK-4241] [table] Cryptic expression parser exceptions




> Cryptic expression parser exceptions
> 
>
> Key: FLINK-4241
> URL: https://issues.apache.org/jira/browse/FLINK-4241
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The exceptions thrown when giving wrong SQL syntax to Flink's SQL parser is 
> very cryptic and should be improved. For example, the following code snippet:
> {code}
> inputTable.filter("a == 0");
> {code}
> gives the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: [1.4] failure: `-' expected but `=' found
> a == 0
>^
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:355)
>   at org.apache.flink.api.table.Table.filter(table.scala:161)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> From this description it is very hard to understand that {{==}} is not a 
> valid operator.
> Another example is:
> {code}
> inputTable.select("*");
> {code}
> which gives
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: Base Failure
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpressionList(ExpressionParser.scala:342)
>   at org.apache.flink.api.table.Table.select(table.scala:103)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:33)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> I think it would considerably improve user experience if we print more 
> helpful parsing exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2529: [FLINK-4241] [table] Cryptic expression parser exc...

2016-09-21 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2529

[FLINK-4241] [table] Cryptic expression parser exceptions

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR improves the error message of the `ExpressionParser`. I tried my 
best but it is not easy to improve the message as we are using `RegexParsers`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4241

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2529.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2529


commit 7d8d69f8b52983ccf078adfa2a78bf0855ec7cf9
Author: twalthr 
Date:   2016-09-21T15:12:31Z

[FLINK-4241] [table] Cryptic expression parser exceptions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-21 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-4657:
--
Description: 
For flip-6, we will have ResourceManager and every JobManager as potential 
leader contender and retriever. We should separate them by using different 
zookeeper path. 
For example, the path could be /leader/resource-manaeger for RM. And for each 
JM, the path could be /leader/job-managers/JobID

  was:
For flip-6, we will have ResourceManager and every JobManager as potential 
leader contender and retriever. We should separate them by using different 
zookeeper path. 
For example, the path could be /leader/resource-manaeger for RM. And for each 
JM, the path could be /leader/job-managers/{JobID}


> Implement HighAvailabilityServices based on zookeeper
> -
>
> Key: FLINK-4657
> URL: https://issues.apache.org/jira/browse/FLINK-4657
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential 
> leader contender and retriever. We should separate them by using different 
> zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each 
> JM, the path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2525: [FLINK-4654] Small improvements to the docs.

2016-09-21 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2525
  
Thank you for the contribution @alpinegizmo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4654) clean up docs

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510200#comment-15510200
 ] 

ASF GitHub Bot commented on FLINK-4654:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2525
  
Thank you for the contribution @alpinegizmo.


> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2528: [FLINK-4643] [gelly] Average Clustering Coefficien...

2016-09-21 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2528

[FLINK-4643] [gelly] Average Clustering Coefficient

Questions:
- Can we generalize "average" to operator on a common interface (i.e. 
"ScorableResult")? Here, the average clustering coefficient is only computed 
over vertices with minimum degree two.
- Would we be better off writing Gelly drivers in Scala?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4643_average_clustering_coefficient

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2528.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2528


commit 773d2420916bca249e38047a21a1a1eb14db7fd2
Author: Greg Hogan 
Date:   2016-09-20T16:00:04Z

[FLINK-4643] [gelly] Average Clustering Coefficient

Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4643) Average Clustering Coefficient

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510186#comment-15510186
 ] 

ASF GitHub Bot commented on FLINK-4643:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2528

[FLINK-4643] [gelly] Average Clustering Coefficient

Questions:
- Can we generalize "average" to operator on a common interface (i.e. 
"ScorableResult")? Here, the average clustering coefficient is only computed 
over vertices with minimum degree two.
- Would we be better off writing Gelly drivers in Scala?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4643_average_clustering_coefficient

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2528.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2528


commit 773d2420916bca249e38047a21a1a1eb14db7fd2
Author: Greg Hogan 
Date:   2016-09-20T16:00:04Z

[FLINK-4643] [gelly] Average Clustering Coefficient

Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.




> Average Clustering Coefficient
> --
>
> Key: FLINK-4643
> URL: https://issues.apache.org/jira/browse/FLINK-4643
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Gelly has Global Clustering Coefficient and Local Clustering Coefficient. 
> This adds Average Clustering Coefficient. The distinction is discussed in 
> [http://jponnela.com/web_documents/twomode.pdf] (pdf page 2, document page 
> 32).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper

2016-09-21 Thread Kurt Young (JIRA)
Kurt Young created FLINK-4657:
-

 Summary: Implement HighAvailabilityServices based on zookeeper
 Key: FLINK-4657
 URL: https://issues.apache.org/jira/browse/FLINK-4657
 Project: Flink
  Issue Type: New Feature
  Components: Cluster Management
Reporter: Kurt Young
Assignee: Kurt Young


For flip-6, we will have ResourceManager and every JobManager as potential 
leader contender and retriever. We should separate them by using different 
zookeeper path. 
For example, the path could be /leader/resource-manaeger for RM. And for each 
JM, the path could be /leader/job-managers/{JobID}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4654) clean up docs

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510166#comment-15510166
 ] 

ASF GitHub Bot commented on FLINK-4654:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2525
  
No worries.  


> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2525: [FLINK-4654] Small improvements to the docs.

2016-09-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2525
  
No worries. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4654) clean up docs

2016-09-21 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4654.
-
Resolution: Implemented

Implemented in 7212202036235f41e376872dc268735ba9ef81e9

> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4654) clean up docs

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510155#comment-15510155
 ] 

ASF GitHub Bot commented on FLINK-4654:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2525


> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2525: [FLINK-4654] Small improvements to the docs.

2016-09-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2525


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4654) clean up docs

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510153#comment-15510153
 ] 

ASF GitHub Bot commented on FLINK-4654:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2525
  
Oh no! I just committed.


> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2525: [FLINK-4654] Small improvements to the docs.

2016-09-21 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2525
  
Oh no! I just committed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4654) clean up docs

2016-09-21 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-4654:
--
Fix Version/s: 1.2.0

> clean up docs
> -
>
> Key: FLINK-4654
> URL: https://issues.apache.org/jira/browse/FLINK-4654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Priority: Trivial
>  Labels: documentation
> Fix For: 1.2.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are some minor but distracting glitches in the docs -- typos, awkward 
> phrases, broken links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4379) Add Rescalable Non-Partitioned State

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510114#comment-15510114
 ] 

ASF GitHub Bot commented on FLINK-4379:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2512
  
I think most of the proposed renames are good. One thing that would be good 
to change is to remove `SnapshotProvider` and just have that method directly on 
`OperatorStateBackend`.

I really like the separation between the user facing state store and the 
backend. I've been meaning to change that for keyed state as well, i.e. the 
user only gets a way to access state and not all the methods for snapshotting, 
closing, etc. from the backend.

When I say I like the changes, I mean everything except that they have 
`Operator` in all of them. In the end, all state is at an operator, plus there 
is the existing interface `OperatorState` that is an alias for `ValueState`. I 
don't have a good alternative now, but I think the `Operator` could be 
confusing.


> Add Rescalable Non-Partitioned State
> 
>
> Key: FLINK-4379
> URL: https://issues.apache.org/jira/browse/FLINK-4379
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Stefan Richter
>
> This issue is associated with [FLIP-8| 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-8%3A+Rescalable+Non-Partitioned+State].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state

2016-09-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2512
  
I think most of the proposed renames are good. One thing that would be good 
to change is to remove `SnapshotProvider` and just have that method directly on 
`OperatorStateBackend`.

I really like the separation between the user facing state store and the 
backend. I've been meaning to change that for keyed state as well, i.e. the 
user only gets a way to access state and not all the methods for snapshotting, 
closing, etc. from the backend.

When I say I like the changes, I mean everything except that they have 
`Operator` in all of them. In the end, all state is at an operator, plus there 
is the existing interface `OperatorState` that is an alias for `ValueState`. I 
don't have a good alternative now, but I think the `Operator` could be 
confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4624) Gelly's summarization algorithm cannot deal with null vertex group values

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510069#comment-15510069
 ] 

ASF GitHub Bot commented on FLINK-4624:
---

GitHub user s1ck opened a pull request:

https://github.com/apache/flink/pull/2527

[FLINK-4624] Allow for null values in Graph Summarization

* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/s1ck/flink FLINK-4624

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2527


commit e6db894d6b84cf95206905a6f1a6713f78e32988
Author: Martin Junghanns 
Date:   2016-09-21T11:31:41Z

[FLINK-4624] Support null values in Graph Summarization

* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long




> Gelly's summarization algorithm cannot deal with null vertex group values
> -
>
> Key: FLINK-4624
> URL: https://issues.apache.org/jira/browse/FLINK-4624
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Reporter: Till Rohrmann
>Assignee: Martin Junghanns
> Fix For: 1.2.0
>
>
> Gelly's {{Summarization}} algorithm cannot handle null values in the 
> `VertexGroupItem.f2`. This behaviour is hidden by using Strings as a vertex 
> value in the {{SummarizationITCase}}, because the {{StringSerializer}} can 
> handle null values. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2527: [FLINK-4624] Allow for null values in Graph Summar...

2016-09-21 Thread s1ck
GitHub user s1ck opened a pull request:

https://github.com/apache/flink/pull/2527

[FLINK-4624] Allow for null values in Graph Summarization

* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/s1ck/flink FLINK-4624

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2527


commit e6db894d6b84cf95206905a6f1a6713f78e32988
Author: Martin Junghanns 
Date:   2016-09-21T11:31:41Z

[FLINK-4624] Support null values in Graph Summarization

* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-21 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509946#comment-15509946
 ] 

Anton Mushin commented on FLINK-4564:
-

Not all clear.
How should look like MetricGroup#getMetricIdentifier() call in this case? 
If we use single reporter for other reporters and know their indexes, we must 
know name or index where we MetricGroup#getMetricIdentifier() call. and then in 
MetricRegistry need add method for getting indexes of reporters via names. 
or all this assumption is not correct?

> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509930#comment-15509930
 ] 

ASF GitHub Bot commented on FLINK-4248:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833586
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the parsed string
+*/
+   public static final String formattedString(byte[] bytes, int startPos, 
int length, char delimiter) {
+   if (length <= 0) {
+   throw new NumberFormatException("Invalid input: Empty 
string");
--- End diff --

But this is not the `parseField` method. `parseField` could catch the 
exception and pass it on as a `NumberFormatException`.


> CsvTableSource does not support reading SqlTimeTypeInfo types
> -
>
> Key: FLINK-4248
> URL: https://issues.apache.org/jira/browse/FLINK-4248
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The Table API's {{CsvTableSource}} does not support to read all Table API 
> supported data types. For example, it is not possible to read 
> {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509931#comment-15509931
 ] 

ASF GitHub Bot commented on FLINK-4248:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833722
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the parsed string
+*/
+   public static final String formattedString(byte[] bytes, int startPos, 
int length, char delimiter) {
+   if (length <= 0) {
+   throw new NumberFormatException("Invalid input: Empty 
string");
+   }
+   int i = 0;
+   final byte delByte = (byte) delimiter;
+
+   while (i < length && bytes[startPos + i] != delByte) {
+   i++;
+   }
+
+   if (i > 0 &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+   throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
--- End diff --

But it does not do anything related to numeric values or am I overlooking 
something?


> CsvTableSource does not support reading SqlTimeTypeInfo types
> -
>
> Key: FLINK-4248
> URL: https://issues.apache.org/jira/browse/FLINK-4248
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The Table API's {{CsvTableSource}} does not support to read all Table API 
> supported data types. For example, it is not possible to read 
> {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4248) CsvTableSource does not support reading SqlTimeTypeInfo types

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509929#comment-15509929
 ] 

ASF GitHub Bot commented on FLINK-4248:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833219
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
--- End diff --

It returns a plain string object and only checks if the string has a 
leading or tailing whitespace. There are no checks for numeric characters or 
similiar. I think the method should be named according to what it does and not 
who's the intended caller.


> CsvTableSource does not support reading SqlTimeTypeInfo types
> -
>
> Key: FLINK-4248
> URL: https://issues.apache.org/jira/browse/FLINK-4248
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
>
> The Table API's {{CsvTableSource}} does not support to read all Table API 
> supported data types. For example, it is not possible to read 
> {{SqlTimeTypeInfo}} types via the {{CsvTableSource}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2303: [FLINK-4248] [core] [table] CsvTableSource does no...

2016-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833586
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the parsed string
+*/
+   public static final String formattedString(byte[] bytes, int startPos, 
int length, char delimiter) {
+   if (length <= 0) {
+   throw new NumberFormatException("Invalid input: Empty 
string");
--- End diff --

But this is not the `parseField` method. `parseField` could catch the 
exception and pass it on as a `NumberFormatException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2303: [FLINK-4248] [core] [table] CsvTableSource does no...

2016-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833219
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
--- End diff --

It returns a plain string object and only checks if the string has a 
leading or tailing whitespace. There are no checks for numeric characters or 
similiar. I think the method should be named according to what it does and not 
who's the intended caller.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2303: [FLINK-4248] [core] [table] CsvTableSource does no...

2016-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2303#discussion_r79833722
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---
@@ -174,6 +174,62 @@ protected void setErrorState(ParseErrorState error) {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+   /**
+* Returns the end position of a string with a numeric format (like 
-XX-XX). Sets the error state if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the end position of the string or -1 if an error occurred
+*/
+   public final int formattedStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+   int len = startPos;
+
+   final int delimLimit = limit - delimiter.length + 1;
+
+   while (len < limit) {
+   if (len < delimLimit && delimiterNext(bytes, len, 
delimiter)) {
+   if (len == startPos) {
+   
setErrorState(ParseErrorState.EMPTY_COLUMN);
+   return -1;
+   }
+   break;
+   }
+   len++;
+   }
+
+   if (len > startPos &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(len - 1)]))) {
+   
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+   return -1;
+   }
+
+   return len;
+   }
+
+   /**
+* Returns a string with a numeric format (like -XX-XX). Throws an 
exception if the
+* string contains leading/trailing whitespaces or if the column is 
empty.
+*
+* @return the parsed string
+*/
+   public static final String formattedString(byte[] bytes, int startPos, 
int length, char delimiter) {
+   if (length <= 0) {
+   throw new NumberFormatException("Invalid input: Empty 
string");
+   }
+   int i = 0;
+   final byte delByte = (byte) delimiter;
+
+   while (i < length && bytes[startPos + i] != delByte) {
+   i++;
+   }
+
+   if (i > 0 &&
+   (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+   throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
--- End diff --

But it does not do anything related to numeric values or am I overlooking 
something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4656) Port existing code to use Flink's future abstraction

2016-09-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4656:


 Summary: Port existing code to use Flink's future abstraction
 Key: FLINK-4656
 URL: https://issues.apache.org/jira/browse/FLINK-4656
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Till Rohrmann
Assignee: Till Rohrmann


Port existing code to use Flink's future abstraction



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509900#comment-15509900
 ] 

ASF GitHub Bot commented on FLINK-4580:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2526

[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

Reports rpc invocations back to the caller.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rpcErrorReporting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2526.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2526


commit b52fb356d48850453956d2f6c4e52ba8fe2da8ab
Author: Till Rohrmann 
Date:   2016-09-21T13:18:27Z

[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller




> Check that the RpcEndpoint supports the specified RpcGateway
> 
>
> Key: FLINK-4580
> URL: https://issues.apache.org/jira/browse/FLINK-4580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When calling {{RpcService.connect}} the user specifies the type of the 
> {{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} 
> actually supports the specified {{RpcGateway}}.
> I think it would be good to add a runtime check that the corresponding 
> {{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can 
> let the connect method fail fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2526: [FLINK-4580] [rpc] Report rpc invocation exception...

2016-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2526

[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

Reports rpc invocations back to the caller.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rpcErrorReporting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2526.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2526


commit b52fb356d48850453956d2f6c4e52ba8fe2da8ab
Author: Till Rohrmann 
Date:   2016-09-21T13:18:27Z

[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway

2016-09-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509897#comment-15509897
 ] 

Till Rohrmann commented on FLINK-4580:
--

Adding this check lazily when executing the rpc call.

> Check that the RpcEndpoint supports the specified RpcGateway
> 
>
> Key: FLINK-4580
> URL: https://issues.apache.org/jira/browse/FLINK-4580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When calling {{RpcService.connect}} the user specifies the type of the 
> {{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} 
> actually supports the specified {{RpcGateway}}.
> I think it would be good to add a runtime check that the corresponding 
> {{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can 
> let the connect method fail fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4655) Add tests for validation of Expressions

2016-09-21 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4655:
---

 Summary: Add tests for validation of Expressions
 Key: FLINK-4655
 URL: https://issues.apache.org/jira/browse/FLINK-4655
 Project: Flink
  Issue Type: Test
  Components: Table API & SQL
Reporter: Timo Walther


Currently, it is only tested if Table API expressions work if the input is 
correct. The validation method of expressions is not tested. The 
{{ExpressionTestBase}} should be extended to provide means to also test invalid 
expressions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3580) Reintroduce Date/Time and implement scalar functions for it

2016-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509873#comment-15509873
 ] 

ASF GitHub Bot commented on FLINK-3580:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2468#discussion_r79824752
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -472,5 +472,30 @@ object localTimestamp {
   }
 }
 
+/**
+  * Determines whether two anchored time intervals overlap.
+  *
+  * It evaluates: leftTemporal >= rightTimePoint && rightTemporal >= 
leftTimePoint
+  *
+  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 
2.hour) leads to true
+  */
+object temporalOverlaps {
+
+  /**
+* Determines whether two anchored time intervals overlap.
+*
+* It evaluates: leftTemporal >= rightTimePoint && rightTemporal >= 
leftTimePoint
--- End diff --

Rephrase condition


> Reintroduce Date/Time and implement scalar functions for it
> ---
>
> Key: FLINK-3580
> URL: https://issues.apache.org/jira/browse/FLINK-3580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> This task includes:
> {code}
> DATETIME_PLUS
> EXTRACT_DATE
> FLOOR
> CEIL
> CURRENT_TIME
> CURRENT_TIMESTAMP
> LOCALTIME
> LOCALTIMESTAMP
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2468: [FLINK-3580] [table] Add OVERLAPS function

2016-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2468#discussion_r79824738
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -472,5 +472,30 @@ object localTimestamp {
   }
 }
 
+/**
+  * Determines whether two anchored time intervals overlap.
+  *
+  * It evaluates: leftTemporal >= rightTimePoint && rightTemporal >= 
leftTimePoint
--- End diff --

Rephrase condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >