[jira] [Comment Edited] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587764#comment-15587764
 ] 

Gabor Gevay edited comment on FLINK-3999 at 10/19/16 5:47 AM:
--

Won't Fix; See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846


was (Author: ggevay):
See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-3999.

Resolution: Won't Fix

See comment here:
https://github.com/apache/flink/pull/2642#issuecomment-254628846

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-18 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2642
  
OK, I see your point now @StephanEwen.
@nssalian, I'm sorry for opening the jira; I didn't think it through from 
this perspective at the time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587758#comment-15587758
 ] 

ASF GitHub Bot commented on FLINK-3999:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2642
  
OK, I see your point now @StephanEwen.
@nssalian, I'm sorry for opening the jira; I didn't think it through from 
this perspective at the time.


> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4854) Efficient Batch Operator in Streaming

2016-10-18 Thread Xiaowei Jiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaowei Jiang updated FLINK-4854:
-
Component/s: DataStream API

> Efficient Batch Operator in Streaming
> -
>
> Key: FLINK-4854
> URL: https://issues.apache.org/jira/browse/FLINK-4854
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xiaowei Jiang
>Assignee: MaGuowei
>  Labels: features
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Very often, it's more efficient to process a batch of records at once instead 
> of processing them one by one. We can use window to achieve this 
> functionality. However, window will store all records in states, which can be 
> costly. It's desirable to have an efficient implementation of batch operator. 
> The batch operator works per task and behave similarly to aligned windows. 
> Here is an example of how the interface looks like to a user.
> interface BatchFunction {
> // add the record to the buffer
> // returns if the batch is ready to be flushed
> boolean addRecord(T record);
> // process all pending records in the buffer
> void flush(Collector collector) ;
> }
> DataStream ds = ...
> BatchFunction func = ...
> ds.batch(func);
> The operator calls addRecord for each record. The batch function saves the 
> record in its own buffer. The addRecord returns if the pending buffer should 
> be flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

2016-10-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587389#comment-15587389
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4727:


Resolved for master via 
http://git-wip-us.apache.org/repos/asf/flink/commit/e4343ba

> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> --
>
> Key: FLINK-4727
> URL: https://issues.apache.org/jira/browse/FLINK-4727
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4856) Add MapState for keyed streams

2016-10-18 Thread Xiaogang Shi (JIRA)
Xiaogang Shi created FLINK-4856:
---

 Summary: Add MapState for keyed streams
 Key: FLINK-4856
 URL: https://issues.apache.org/jira/browse/FLINK-4856
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Xiaogang Shi


Many states in keyed streams are organized as key-value pairs. Currently, these 
states are implemented by storing the entire map into a ValueState or a 
ListState. The implementation however is very costly because all entries have 
to be serialized/deserialized when updating a single entry. To improve the 
efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587370#comment-15587370
 ] 

ASF GitHub Bot commented on FLINK-4727:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2585


> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> --
>
> Key: FLINK-4727
> URL: https://issues.apache.org/jira/browse/FLINK-4727
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2585: [FLINK-4727] [kafka-connector] Set missing initial...

2016-10-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2585


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15587354#comment-15587354
 ] 

ASF GitHub Bot commented on FLINK-4727:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2585
  
The failing tests are unrelated, and related tests are covered and have 
passed. Merging this ...


> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> --
>
> Key: FLINK-4727
> URL: https://issues.apache.org/jira/browse/FLINK-4727
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2585: [FLINK-4727] [kafka-connector] Set missing initial offset...

2016-10-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2585
  
The failing tests are unrelated, and related tests are covered and have 
passed. Merging this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4855:


 Summary: Add partitionedKeyBy to DataStream
 Key: FLINK-4855
 URL: https://issues.apache.org/jira/browse/FLINK-4855
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Xiaowei Jiang
Assignee: MaGuowei


After we do any interesting operations (e.g. reduce) on KeyedStream, the result 
becomes DataStream. In a lot of cases, the output still has the same or 
compatible keys with the KeyedStream (logically). But to do further operations 
on these keys, we are forced to use keyby again. This works semantically, but 
is costly in two aspects. First, it destroys the possibility of chaining, which 
is one of the most important optimization technique. Second, keyby will greatly 
expand the connected components of tasks, which has implications in failover 
optimization.

To address this shortcoming, we propose a new operator partitionedKeyBy.

DataStream {
public  KeyedStream partitionedKeyBy(KeySelector key)
}

Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as 
an extra field. This guarantees that records from different tasks will never 
produce the same keys.

With this, it's possible to do

ds.keyBy(key1).reduce(func1)
.partitionedKeyBy(key1).reduce(func2)
.partitionedKeyBy(key2).reduce(func3);

Most importantly, in certain cases, we will be able to chains these into a 
single vertex.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4854:


 Summary: Efficient Batch Operator in Streaming
 Key: FLINK-4854
 URL: https://issues.apache.org/jira/browse/FLINK-4854
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaowei Jiang
Assignee: MaGuowei


Very often, it's more efficient to process a batch of records at once instead 
of processing them one by one. We can use window to achieve this functionality. 
However, window will store all records in states, which can be costly. It's 
desirable to have an efficient implementation of batch operator. The batch 
operator works per task and behave similarly to aligned windows. Here is an 
example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the 
record in its own buffer. The addRecord returns if the pending buffer should be 
flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586694#comment-15586694
 ] 

ASF GitHub Bot commented on FLINK-3999:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2642
  
@StephanEwen ,thanks for that, that makes sense. But it is hard for someone 
new to understand the problem you mention. If someone familiar with the 
product, could review new JIRAs for clear benefit / validity, that would 
encourage folks jumping on the tasks to know it better. I realize it may not be 
feasible but it saves time too to let them know in advance. I can certainly 
close the PR and make it Not a Problem. But if there is a better way to have 
this clearer , I would appreciate it before the effort is put in. 


> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-18 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2642
  
@StephanEwen ,thanks for that, that makes sense. But it is hard for someone 
new to understand the problem you mention. If someone familiar with the 
product, could review new JIRAs for clear benefit / validity, that would 
encourage folks jumping on the tasks to know it better. I realize it may not be 
feasible but it saves time too to let them know in advance. I can certainly 
close the PR and make it Not a Problem. But if there is a better way to have 
this clearer , I would appreciate it before the effort is put in. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-18 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that evaluate operation is defined for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML when using SVM.

1- We need to update the documentation as follows:

 val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)


2- Update code such that LabeledVector can be used with evaluate method








  was:
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.

We need to update the documentation as follows:

 val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)









> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> 1- We need to update the documentation as follows:
>  val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)
> 2- Update code such that LabeledVector can be used with evaluate method



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586580#comment-15586580
 ] 

ASF GitHub Bot commented on FLINK-3999:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2642
  
I am a bit skeptical about these types of changes.

Maintaining stability of Flink is crucially important now. With changes 
like these we risk introducing subtle bugs without fixing a bug or improving a 
functionality or anything. It just changes a naming preference.

The kind of bugs one can introduce with changes like that are very subtle 
(we have seen that in cases where the cancelling logic was fragile depending on 
how the flag was exactly set) and hard to catch all in reviews. Not everything 
is fully covered by bullet proof tested in practice.

I would actually suggest to rather contribute on issues where there is a 
clearer benefit.


> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2642
  
I am a bit skeptical about these types of changes.

Maintaining stability of Flink is crucially important now. With changes 
like these we risk introducing subtle bugs without fixing a bug or improving a 
functionality or anything. It just changes a naming preference.

The kind of bugs one can introduce with changes like that are very subtle 
(we have seen that in cases where the cancelling logic was fragile depending on 
how the flag was exactly set) and hard to catch all in reviews. Not everything 
is fully covered by bullet proof tested in practice.

I would actually suggest to rather contribute on issues where there is a 
clearer benefit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586564#comment-15586564
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
Resolved merge conflicts and squashed commits to rebase with master


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-10-18 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
Resolved merge conflicts and squashed commits to rebase with master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4846) FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM

2016-10-18 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER closed FLINK-4846.
--
Resolution: Invalid

> FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM
> 
>
> Key: FLINK-4846
> URL: https://issues.apache.org/jira/browse/FLINK-4846
> Project: Flink
>  Issue Type: Improvement
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> With Flink ML you can import file via MLUtils.readLibSVM (import 
> org.apache.flink.ml.MLUtils)
> For example:
> val env = ExecutionEnvironment.getExecutionEnvironment
> val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, 
> "src/main/resources/svmguide1")
> I'd like to pass "env" as an implicit parameter and use the method as such:
> val astroTrain: DataSet[LabeledVector] = 
> MLUtils.readLibSVM("src/main/resources/svmguide1")
> Is it ok (not a scala specialist yet :) ) ?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime

2016-10-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-4852:
---

Assignee: Aljoscha Krettek

> ClassCastException when assigning Watermarks with 
> TimeCharacteristic.ProcessingTime
> ---
>
> Key: FLINK-4852
> URL: https://issues.apache.org/jira/browse/FLINK-4852
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0, 1.1.4
>
>
> As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, 
> when emitting Watermarks and using processing time, you get the following 
> ClassCastException:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
>   at 
> org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
>   at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>   ... 11 more
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44)
>   at 
> 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586082#comment-15586082
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Concerning the Kafka test: From the logs, the test fails because a topic 
cannot be deleted. The ZooKeeper operation blocks and test times out. I am 
pretty sure that this is unrelated, as no Flink is running at that point.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Concerning the Kafka test: From the logs, the test fails because a topic 
cannot be deleted. The ZooKeeper operation blocks and test times out. I am 
pretty sure that this is unrelated, as no Flink is running at that point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4445) Ignore unmatched state when restoring from savepoint

2016-10-18 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586067#comment-15586067
 ] 

Stephan Ewen commented on FLINK-4445:
-

+1 for option (1)

> Ignore unmatched state when restoring from savepoint
> 
>
> Key: FLINK-4445
> URL: https://issues.apache.org/jira/browse/FLINK-4445
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.1
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> When currently submitting a job with a savepoint, we require that all state 
> is matched to the new job. Many users have noted that this is overly strict. 
> I would like to loosen this and allow savepoints to be restored without 
> matching all state.
> The following options come to mind:
> (1) Keep the current behaviour, but add a flag to allow ignoring state when 
> restoring, e.g. {{bin/flink -s  --ignoreUnmatchedState}}. This 
> would be non-API breaking.
> (2) Ignore unmatched state and continue. Additionally add a flag to be strict 
> about checking the state, e.g. {{bin/flink -s  --strict}}. This 
> would be API-breaking as the default behaviour would change. Users might be 
> confused by this because there is no straight forward way to notice that 
> nothing has been restored.
> I'm not sure what's the best thing here. [~gyfora], [~aljoscha] What do you 
> think?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585948#comment-15585948
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Travis gives the green light except for a Kafka failure. @StephanEwen 
@rmetzger Do you know whether this is a known issue? Or might it be a 
regression from this change? 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Travis gives the green light except for a Kafka failure. @StephanEwen 
@rmetzger Do you know whether this is a known issue? Or might it be a 
regression from this change? 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2656: [FLINK-4852] Remove Non-Multiplexing StreamRecordS...

2016-10-18 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2656

[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer

This is getting rid of a source `ClassCastExceptions` that cropped up 
several times now. We simply always to the multiplexing now which adds the cost 
of transmitting one byte to every element we transmit.

R: @mxm and @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink remove-stream-record-serializer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2656


commit 87fd6b6d824c03f2ff6591af8ae28a9b6e65296b
Author: Aljoscha Krettek 
Date:   2016-10-18T16:32:17Z

[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer

This also renames MultiplexingStreamRecordSerializer to
StreamElementSerializer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585929#comment-15585929
 ] 

ASF GitHub Bot commented on FLINK-4852:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2656

[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer

This is getting rid of a source `ClassCastExceptions` that cropped up 
several times now. We simply always to the multiplexing now which adds the cost 
of transmitting one byte to every element we transmit.

R: @mxm and @StephanEwen 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink remove-stream-record-serializer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2656.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2656


commit 87fd6b6d824c03f2ff6591af8ae28a9b6e65296b
Author: Aljoscha Krettek 
Date:   2016-10-18T16:32:17Z

[FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer

This also renames MultiplexingStreamRecordSerializer to
StreamElementSerializer.




> ClassCastException when assigning Watermarks with 
> TimeCharacteristic.ProcessingTime
> ---
>
> Key: FLINK-4852
> URL: https://issues.apache.org/jira/browse/FLINK-4852
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, 
> when emitting Watermarks and using processing time, you get the following 
> ClassCastException:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
>   at 
> org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>   at 
> 

[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-18 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2642
  
@ggevay  they are not related.. I think it caught from me old branch on to 
this one. The 3 in the middle2ef4401,46d91b0,1433a5d are 
relevant here. Apologies for the mix up. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585754#comment-15585754
 ] 

ASF GitHub Bot commented on FLINK-3999:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2642
  
@ggevay  they are not related.. I think it caught from me old branch on to 
this one. The 3 in the middle2ef4401,46d91b0,1433a5d are 
relevant here. Apologies for the mix up. 


> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library

2016-10-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585716#comment-15585716
 ] 

Gábor Hermann commented on FLINK-1807:
--

Thanks for your reply!

Then, if I understand you correctly, this solution would not be proper because 
of the excessive use of memory.

I believe sampling with dynamic path could have another significant overhead. 
If we used that approach, we would have to load a sample/minibatch of the data 
from another resource (disk/network) at every iteration step, and that might 
have a negative effect on performance. Of course if the sampling would not read 
the whole data from disk, but only the needed sample, then it would have a 
plausible performance. In another case, if sampling must read the whole data at 
every iteration, it could be arguably slow. (There's a third case, when we keep 
the data in memory, and the sampling does not have to do IO, but then we have 
similar memory usage as with my workaround.)

As I see it, the two solutions (my suggested workaround and sampling with 
dynamic path) represent two sides of a memory-performance tradeoff: mine using 
too much memory, the other (possibly) being slow. Do I see it right? Do you 
think it's worth to choose the sampling approach here, because the performance 
overhead would be much lower? Or my workaround would be too "hacky", and it 
should not be burnt into the algorithm whether the sampling happens from memory 
or from disk?

> Stochastic gradient descent optimizer for ML library
> 
>
> Key: FLINK-1807
> URL: https://issues.apache.org/jira/browse/FLINK-1807
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> Stochastic gradient descent (SGD) is a widely used optimization technique in 
> different ML algorithms. Thus, it would be helpful to provide a generalized 
> SGD implementation which can be instantiated with the respective gradient 
> computation. Such a building block would make the development of future 
> algorithms easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4853) Clean up JobManager registration at the ResourceManager

2016-10-18 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4853:


 Summary: Clean up JobManager registration at the ResourceManager
 Key: FLINK-4853
 URL: https://issues.apache.org/jira/browse/FLINK-4853
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The current {{JobManager}} registration at the {{ResourceManager}} blocks 
threads in the {{RpcService.execute}} pool. This is not ideal and can be 
avoided by not waiting on a {{Future}} in this call.

I propose to encapsulate the leader id retrieval operation in a distinct 
service so that it can be separated from the {{ResourceManager}}. This will 
reduce the complexity of the {{ResourceManager}} and make the individual 
components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2016-10-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585619#comment-15585619
 ] 

Ted Yu commented on FLINK-4499:
---

+1 on starting with a very small set of rules

> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime

2016-10-18 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-4852:
-

 Summary: ClassCastException when assigning Watermarks with 
TimeCharacteristic.ProcessingTime
 Key: FLINK-4852
 URL: https://issues.apache.org/jira/browse/FLINK-4852
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.0, 1.1.3
Reporter: Maximilian Michels
 Fix For: 1.2.0, 1.1.4



As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, when 
emitting Watermarks and using processing time, you get the following 
ClassCastException:

{noformat}

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not forward element to next 
operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
at 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
at 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
... 11 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340)
at 
org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:340)
... 14 more
Caused by: java.lang.ClassCastException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)

[jira] [Closed] (FLINK-4512) Add option for persistent checkpoints

2016-10-18 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4512.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

Implemented in fd410d9 (master).

> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4507) Deprecate savepoint backend config

2016-10-18 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4507.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in fd410d9 (master).

> Deprecate savepoint backend config
> --
>
> Key: FLINK-4507
> URL: https://issues.apache.org/jira/browse/FLINK-4507
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> The savepoint backend configuration allows both {{jobmanager}} and 
> {{filesystem}} as values. The {{jobmanager}} variant is used as default if 
> nothing is configured.
> As part of FLIP-10, we want to get rid of this distinction and make all 
> savepoints go to a file. Savepoints backed by JobManagers are only relevant 
> for testing. Users could only recover from them if they did not shut down the 
> current cluster.
> Deprecate the {{savepoints.state.backend.fs.dir}} and add 
> {{state.savepoints.dir}} as new config key. This is used as the default 
> savepoint directory. Users can overwrite this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4509) Specify savepoint directory per savepoint

2016-10-18 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4509.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in fd410d9 (master).

> Specify savepoint directory per savepoint
> -
>
> Key: FLINK-4509
> URL: https://issues.apache.org/jira/browse/FLINK-4509
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> Currently, savepoints go to a per cluster configured default directory 
> (configured via {{savepoints.state.backend}} and 
> {{savepoints.state.backend.fs.dir}}).
> We shall allow to specify the directory per triggered savepoint in case no 
> default is configured.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...

2016-10-18 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2655

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR is based on #2651 

This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the 
RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the 
PR adds the `MetricRegistry` to the RM which can be used to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the `TestingFatalErrorHandler` into the util package of 
flink-runtime test. That way, it is usable across multiple tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2655


commit ddf35c4ddb04629cddebb2401488effe93416b70
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9
Author: Till Rohrmann 
Date:   2016-10-17T14:03:02Z

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. 
The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds 
the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-18 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.

We need to update the documentation as follows:

 val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)








  was:
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.

Update 







> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>
> It seems that SVM has predict operation for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML.
> We need to update the documentation as follows:
>  val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-18 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.

Update 






  was:
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.






> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>
> It seems that SVM has predict operation for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML.
> Update 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83854971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
 ---
@@ -15,38 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.streaming.api.operators;
 
-package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.annotation.Internal;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends TimeServiceProvider {
-
-   private volatile boolean terminated;
-
-   @Override
-   public long getCurrentProcessingTime() {
-   return System.currentTimeMillis();
-   }
-
-   @Override
-   public ScheduledFuture registerTimer(long timestamp, Triggerable 
target) {
-   return null;
-   }
-
-   @Override
-   public boolean isTerminated() {
-   return terminated;
-   }
-
-   @Override
-   public void quiesceAndAwaitPending() {}
-
-   @Override
-   public void shutdownService() {
-   terminated = true;
-   }
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable {
--- End diff --

I found this name a bit confusing, it could lead to confusion with the 
concept of Triggers. Furthermore, there is another class with the same simple 
name in a different package. Maybe this could be called `TimerCallback`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585571#comment-15585571
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2510
  
Sorry, I'm extremely busy these days. I'm not sure when will I have time, 
unfortunately.


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

2016-10-18 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2510
  
Sorry, I'm extremely busy these days. I'm not sure when will I have time, 
unfortunately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83862277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
+   if (newMin > combinedWatermark) {
+   combinedWatermark = newMin;
+   processWatermark(new Watermark(combinedWatermark));
+   }
+   }
+
+   public void processWatermark2(Watermark mark) throws Exception {
--- End diff --

As a general comment, somehow I don't like how two cases (one and 

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585561#comment-15585561
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860194
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83861268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
--- End diff --

`contains()` + `remove()` seems a bit redundant for this use. I would just 
always remove and check the return value for null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585556#comment-15585556
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83856028
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
 

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585554#comment-15585554
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83854971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
 ---
@@ -15,38 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.streaming.api.operators;
 
-package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.annotation.Internal;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends TimeServiceProvider {
-
-   private volatile boolean terminated;
-
-   @Override
-   public long getCurrentProcessingTime() {
-   return System.currentTimeMillis();
-   }
-
-   @Override
-   public ScheduledFuture registerTimer(long timestamp, Triggerable 
target) {
-   return null;
-   }
-
-   @Override
-   public boolean isTerminated() {
-   return terminated;
-   }
-
-   @Override
-   public void quiesceAndAwaitPending() {}
-
-   @Override
-   public void shutdownService() {
-   terminated = true;
-   }
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable {
--- End diff --

I found this name a bit confusing, it could lead to confusion with the 
concept of Triggers. Furthermore, there is another class with the same simple 
name in a different package. Maybe this could be called `TimerCallback`?


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585564#comment-15585564
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Internal class for keeping track of in-flight timers.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimer implements Comparable> {
+   private final long timestamp;
+   private final K key;
+   private final N namespace;
+
+   public InternalTimer(long timestamp, K key, N namespace) {
+   this.timestamp = timestamp;
+   this.key = key;
+   this.namespace = namespace;
+   }
+
+   public long getTimestamp() {
+   return timestamp;
+   }
+
+   public K getKey() {
+   return key;
+   }
+
+   public N getNamespace() {
+   return namespace;
+   }
+
+   @Override
+   public int compareTo(InternalTimer o) {
+   return Long.compare(this.timestamp, o.timestamp);
--- End diff --

Method compareTo is not fully aligned with equals, which is acceptable but 
strongly advised against by the documentation of `Comparable`.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Internal class for keeping track of in-flight timers.
+ *
+ * @param  Type of the keys to which timers are scoped.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimer implements Comparable> {
+   private final long timestamp;
+   private final K key;
+   private final N namespace;
+
+   public InternalTimer(long timestamp, K key, N namespace) {
+   this.timestamp = timestamp;
+   this.key = key;
+   this.namespace = namespace;
+   }
+
+   public long getTimestamp() {
+   return timestamp;
+   }
+
+   public K getKey() {
+   return key;
+   }
+
+   public N getNamespace() {
+   return namespace;
+   }
+
+   @Override
+   public int compareTo(InternalTimer o) {
+   return Long.compare(this.timestamp, o.timestamp);
--- End diff --

Method compareTo is not fully aligned with equals, which is acceptable but 
strongly advised against by the documentation of `Comparable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585557#comment-15585557
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83861268
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
--- End diff --

`contains()` + `remove()` seems a bit redundant for this use. I would just 
always remove and check the return value for null. 


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83857663
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

I wonder if `TimeDomain` and `TimerService` should be parameter to methods 
in this interface. I assume both remain stable for the lifetime of the UDF and 
could be passed once in some init method that can also be preimplemented in a 
`RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I 
like to keep the number of parameters small when possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585562#comment-15585562
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be 
scoped.
+ *
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService {
+
+   /** Returns the current processing time. */
+   long currentProcessingTime();
+
+   /** Returns the current event time. */
+   long currentWatermark();
--- End diff --

The corresponding method in the public interface is called 
`currentEventTime`. Does it makes sense to keep both method names synchronous? 


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585559#comment-15585559
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83857663
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * {@code
+ * DataStream input = ...;
+ *
+ * DataStream result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }
+ *
+ * @param  Type of the input elements.
+ * @param  Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction extends Function, 
Serializable {
+
+   /**
+* The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+* it into zero, one, or more elements.
+*
+* @param value The input value.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void flatMap(I value, TimerService timerService, Collector out) 
throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param timeDomain The {@link TimeDomain} of the firing timer.
+* @param timerService A {@link TimerService} that allows setting 
timers and querying the
+*current time.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector out) throws Exception ;
--- End diff --

I wonder if `TimeDomain` and `TimerService` should be parameter to methods 
in this interface. I assume both remain stable for the lifetime of the UDF and 
could be passed once in some init method that can also be preimplemented in a 
`RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I 
like to keep the number of parameters small when possible.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860194
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = restoredTimers.watermarkTimers;
+   watermarkTimersQueue = 

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585558#comment-15585558
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83855317
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83856028
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+   

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83859446
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
--- End diff --

RestoredTimers are serialized with their typeserializers. It could make 
sense to have some 

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585563#comment-15585563
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860732
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 ---
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+   private static InternalTimer anyInternalTimer() {
+   return any();
+   }
+
+   /**
+* Verify that we only ever have one processing-time task registered at 
the
+* {@link ProcessingTimeService}.
+*/
+   @Test
+   public void testOnlySetsOnePhysicalProcessingTimeTimer() throws 
Exception {
+   @SuppressWarnings("unchecked")
+   Triggerable mockTriggerable = 
mock(Triggerable.class);
+
+   TestKeyContext keyContext = new TestKeyContext();
+
+   TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+   HeapInternalTimerService timerService =
+   createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+   keyContext.setCurrentKey(0);
+
+   timerService.registerProcessingTimeTimer("ciao", 10);
+   timerService.registerProcessingTimeTimer("ciao", 20);
+   timerService.registerProcessingTimeTimer("ciao", 30);
+   timerService.registerProcessingTimeTimer("hello", 10);
+   timerService.registerProcessingTimeTimer("hello", 20);
+
+   assertEquals(5, timerService.numProcessingTimeTimers());
+   assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+
+   processingTimeService.setCurrentTime(10);
+
+   assertEquals(3, timerService.numProcessingTimeTimers());
+   assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+   processingTimeService.setCurrentTime(20);
+
+   assertEquals(1, 

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1558#comment-1558
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83859446
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService implements 
InternalTimerService, Triggerable {
+
+   private final TypeSerializer keySerializer;
+
+   private final TypeSerializer namespaceSerializer;
+
+   private final ProcessingTimeService processingTimeService;
+
+   private long currentWatermark = Long.MIN_VALUE;
+
+   private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget;
+
+   private final KeyContext keyContext;
+
+   /**
+* Processing time timers that are currently in-flight.
+*/
+   private final PriorityQueue> 
processingTimeTimersQueue;
+   private final Set> processingTimeTimers;
+
+   protected ScheduledFuture nextTimer = null;
+
+   /**
+* Currently waiting watermark callbacks.
+*/
+   private final Set> watermarkTimers;
+   private final PriorityQueue> watermarkTimersQueue;
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService) {
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   this.keyContext = keyContext;
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+
+   watermarkTimers = new HashSet<>();
+   watermarkTimersQueue = new PriorityQueue<>(100);
+
+   processingTimeTimers = new HashSet<>();
+   processingTimeTimersQueue = new PriorityQueue<>(100);
+   }
+
+   public HeapInternalTimerService(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   org.apache.flink.streaming.api.operators.Triggerable triggerTarget,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   RestoredTimers restoredTimers) {
+
+   this.keySerializer = checkNotNull(keySerializer);
+   this.namespaceSerializer = checkNotNull(namespaceSerializer);
+   this.triggerTarget = checkNotNull(triggerTarget);
+   

[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585560#comment-15585560
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83862277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -390,4 +425,141 @@ public void close() {
output.close();
}
}
+
+   // 

+   //  Watermark handling
+   // 

+
+   /**
+* Returns a {@link InternalTimerService} that can be used to query 
current processing time
+* and event time and to set timers. An operator can have several timer 
services, where
+* each has its own namespace serializer. Timer services are 
differentiated by the string
+* key that is given when requesting them, if you call this method with 
the same key
+* multiple times you will get the same timer service instance in 
subsequent requests.
+*
+* Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+* When a timer fires, this key will also be set as the currently 
active key.
+*
+* Each timer has attached metadata, the namespace. Different timer 
services
+* can have a different namespace type. If you don't need namespace 
differentiation you
+* can use {@link VoidNamespaceSerializer} as the namespace serializer.
+*
+* @param name The name of the requested timer service. If no service 
exists under the given
+* name a new one will be created and returned.
+* @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+* @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+* @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+*
+* @param  The type of the timer keys.
+* @param  The type of the timer namespace.
+*/
+   public  InternalTimerService getInternalTimerService(
+   String name,
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   Triggerable triggerable) {
+
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService service = 
(HeapInternalTimerService) timerServices.get(name);
+
+   if (service == null) {
+   if (restoredServices != null && 
restoredServices.containsKey(name)) {
+   @SuppressWarnings("unchecked")
+   HeapInternalTimerService.RestoredTimers 
restoredService =
+   
(HeapInternalTimerService.RestoredTimers) restoredServices.remove(name);
+
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService(),
+   restoredService);
+
+   } else {
+   service = new HeapInternalTimerService<>(
+   keySerializer,
+   namespaceSerializer,
+   triggerable,
+   this,
+   
getRuntimeContext().getProcessingTimeService());
+   }
+   timerServices.put(name, service);
+   }
+
+   return service;
+   }
+
+   public void processWatermark(Watermark mark) throws Exception {
+   for (HeapInternalTimerService service : 
timerServices.values()) {
+   service.advanceWatermark(mark.getTimestamp());
+   }
+   output.emitWatermark(mark);
+   }
+
+   public void processWatermark1(Watermark mark) throws Exception {
+   input1Watermark = mark.getTimestamp();
+   long newMin = Math.min(input1Watermark, input2Watermark);
+   if (newMin > combinedWatermark) {
+   combinedWatermark = newMin;
+   

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83858860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be 
scoped.
+ *
+ * @param  Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService {
+
+   /** Returns the current processing time. */
+   long currentProcessingTime();
+
+   /** Returns the current event time. */
+   long currentWatermark();
--- End diff --

The corresponding method in the public interface is called 
`currentEventTime`. Does it makes sense to keep both method names synchronous? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83860732
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 ---
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+   private static InternalTimer anyInternalTimer() {
+   return any();
+   }
+
+   /**
+* Verify that we only ever have one processing-time task registered at 
the
+* {@link ProcessingTimeService}.
+*/
+   @Test
+   public void testOnlySetsOnePhysicalProcessingTimeTimer() throws 
Exception {
+   @SuppressWarnings("unchecked")
+   Triggerable mockTriggerable = 
mock(Triggerable.class);
+
+   TestKeyContext keyContext = new TestKeyContext();
+
+   TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+   HeapInternalTimerService timerService =
+   createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+   keyContext.setCurrentKey(0);
+
+   timerService.registerProcessingTimeTimer("ciao", 10);
+   timerService.registerProcessingTimeTimer("ciao", 20);
+   timerService.registerProcessingTimeTimer("ciao", 30);
+   timerService.registerProcessingTimeTimer("hello", 10);
+   timerService.registerProcessingTimeTimer("hello", 20);
+
+   assertEquals(5, timerService.numProcessingTimeTimers());
+   assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+
+   processingTimeService.setCurrentTime(10);
+
+   assertEquals(3, timerService.numProcessingTimeTimers());
+   assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, processingTimeService.getNumRegisteredTimers());
+   
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+   processingTimeService.setCurrentTime(20);
+
+   assertEquals(1, timerService.numProcessingTimeTimers());
+   assertEquals(0, timerService.numProcessingTimeTimers("hello"));
+   assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+   assertEquals(1, 

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r83855317
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+   @Test
+   public void testCurrentEventTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
WatermarkQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.processWatermark(new Watermark(17));
+   testHarness.processElement(new StreamRecord<>(5, 12L));
+
+   testHarness.processWatermark(new Watermark(42));
+   testHarness.processElement(new StreamRecord<>(6, 13L));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new Watermark(17L));
+   expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+   expectedOutput.add(new Watermark(42L));
+   expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+
+   TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+   testHarness.close();
+   }
+
+   @Test
+   public void testCurrentProcessingTime() throws Exception {
+
+   StreamTimelyFlatMap operator =
+   new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
ProcessingTimeQueryingFlatMapFunction());
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
+
+   testHarness.setup();
+   testHarness.open();
+
+   testHarness.setProcessingTime(17);
+   testHarness.processElement(new StreamRecord<>(5));
+
+   testHarness.setProcessingTime(42);
+   testHarness.processElement(new StreamRecord<>(6));
+
+   ConcurrentLinkedQueue expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+   expectedOutput.add(new StreamRecord<>("5PT:17"));
+   

[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585514#comment-15585514
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2510
  
@ggevay 
Do you have some time to check the last commit in this PR?


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585538#comment-15585538
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2655

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR is based on #2651 

This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the 
RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the 
PR adds the `MetricRegistry` to the RM which can be used to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the `TestingFatalErrorHandler` into the util package of 
flink-runtime test. That way, it is usable across multiple tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2655


commit ddf35c4ddb04629cddebb2401488effe93416b70
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9
Author: Till Rohrmann 
Date:   2016-10-17T14:03:02Z

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. 
The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds 
the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.




> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-18 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585534#comment-15585534
 ] 

Maximilian Michels commented on FLINK-4829:
---

To mitigate the problem, we have best-effort reporting for accumulators now.

master: 783dca56eedc95f0a8974a9b50f2b532ca8cf849, 
d95929e0110b53f03452e1ad453de2522f79a6b8
release-1.1: c1d6b24600e40700fa06caa28bc81788d8e92386, 
210230c4ab44b84c28b9a62ff461de0955e67f8f

> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2649: [FLINK-4829] snapshot accumulators on a best-effor...

2016-10-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2649


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4829) Accumulators are not thread safe

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585520#comment-15585520
 ] 

ASF GitHub Bot commented on FLINK-4829:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2649


> Accumulators are not thread safe
> 
>
> Key: FLINK-4829
> URL: https://issues.apache.org/jira/browse/FLINK-4829
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> Flink's {{Accumulators}} are not thread safe. With the introduction of live 
> accumulator snapshots which are sent to the {{JobManager}}, we've introduced 
> a concurrent access to accumulators without properly guard them against 
> concurrent modifications. So if an accumulator snapshot is taken for an 
> accumulator which is at the same time modified, it can cause an 
> {{ConcurrentModificationException}} as it was reported by an user: 
> {code}
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed 
> to serialize accumulators for task.
> java.util.ConcurrentModificationException
> at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
> at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
> at java.util.TreeMap.writeObject(TreeMap.java:2436)
> at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
> at java.util.HashMap.writeObject(HashMap.java:1362)
> at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
> at 
> org.apache.flink.util.SerializedValue.(SerializedValue.java:52)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
> at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...

2016-10-18 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2510
  
@ggevay 
Do you have some time to check the last commit in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-18 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4851:


 Summary: Add FatalErrorHandler and MetricRegistry to 
ResourceManager
 Key: FLINK-4851
 URL: https://issues.apache.org/jira/browse/FLINK-4851
 Project: Flink
  Issue Type: Sub-task
  Components: ResourceManager
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. In 
order to harmonize the fatal error handling across all components, we should 
introduce a {{FatalErrorHandler}}, which handles fatal errors. Additionally, we 
should also give a {{MetricRegistry}} to the {{ResourceManager}} so that it can 
report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2118) Table API fails on composite filter conditions

2016-10-18 Thread Alexander Shoshin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Shoshin reassigned FLINK-2118:


Assignee: Alexander Shoshin

> Table API fails on composite filter conditions
> --
>
> Key: FLINK-2118
> URL: https://issues.apache.org/jira/browse/FLINK-2118
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Alexander Shoshin
>
> Having a composite filter conditions such as 
> {code}
> myTable.filter('name !== "Pete" && 'name !== "Bob")
> {code}
> fails with the following error message:
> {code}
> ExpressionException: Non-boolean operand types String and String in Pete && 
> 'name
> {code}
> whereas 
> {code}
> myTable.filter( ('name !== "Pete") && ('name !== "Bob") )
> {code}
> works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585425#comment-15585425
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Haha, that picture convinced me to actually add the test :smile: 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Haha, that picture convinced me to actually add the test :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Yeah, this sort of covers it. Just afraid of such a situation here: 
https://twitter.com/thepracticaldev/status/687672086152753152


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585403#comment-15585403
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Yeah, this sort of covers it. Just afraid of such a situation here: 
https://twitter.com/thepracticaldev/status/687672086152753152


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585380#comment-15585380
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
We have the `TaskManagerProcessReapingTest` which tests that the 
TaskManager process properly exits when the TaskManager actor dies. In 
addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests 
that the `FatalError` message terminates the actor. 

Do you think we are already covered by this? We can certainly add a process 
reaper test variant that sends a `FatalError` message instead of the 
`PoisonPill`. 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
We have the `TaskManagerProcessReapingTest` which tests that the 
TaskManager process properly exits when the TaskManager actor dies. In 
addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests 
that the `FatalError` message terminates the actor. 

Do you think we are already covered by this? We can certainly add a process 
reaper test variant that sends a `FatalError` message instead of the 
`PoisonPill`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585352#comment-15585352
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
What do you think about a followup test, where we ensure that a fatal error 
notification on the TaskManager actually results in a process kill?


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
What do you think about a followup test, where we ensure that a fatal error 
notification on the TaskManager actually results in a process kill?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585346#comment-15585346
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can 
easily migrate the task manager options as a follow up. 


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can 
easily migrate the task manager options as a follow up. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585344#comment-15585344
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83842005
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

You can also give a message to `assertFalse` - I like assertEquals for 
printing the expected value, but if the expected value is false, the former 
seems more natural to me...


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library

2016-10-18 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585342#comment-15585342
 ] 

Till Rohrmann commented on FLINK-1807:
--

This could work as a workaround but I think it's not the proper solution to the 
problem. Instead we should try to fix the static path problem. I.e. we could 
introduce an operator flag which allows to mark an operator as being dynamic. 
When determining the static path not only the partial solution dataset but also 
the sampling operator which actually lies on a static path, should be marked as 
dynamic. Then for each iteration, we have to start executing all dynamic 
operators.

> Stochastic gradient descent optimizer for ML library
> 
>
> Key: FLINK-1807
> URL: https://issues.apache.org/jira/browse/FLINK-1807
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> Stochastic gradient descent (SGD) is a widely used optimization technique in 
> different ML algorithms. Thus, it would be helpful to provide a generalized 
> SGD implementation which can be instantiated with the respective gradient 
> computation. Such a building block would make the development of future 
> algorithms easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83842005
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

You can also give a message to `assertFalse` - I like assertEquals for 
printing the expected value, but if the expected value is false, the former 
seems more natural to me...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585332#comment-15585332
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83841451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   

[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-18 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83841451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   StackTraceElement[] stack = 
executor.getStackTrace();
+   for (StackTraceElement e : stack) {
+   bld.append(e).append('\n');
+   }
 
- 

[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-18 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840872
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

`assertFalse` only fails with printing `AssertionError` and you have to 
check the stack trace whereas `assertEquals` has a message printing expected 
and actual inputs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585325#comment-15585325
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840872
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
--- End diff --

`assertFalse` only fails with printing `AssertionError` and you have to 
check the stack trace whereas `assertEquals` has a message printing expected 
and actual inputs.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585320#comment-15585320
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
--- End diff --

how about adding `task.getExecutingThread().join()` instead of the using 
the trigger latch? Seems more intuitive and safer.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83840545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java 
---
@@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws 
Exception {
verify(inputGate, 
times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
 
+   /**
+* Tests that interrupt happens via watch dog if canceller is stuck in 
cancel.
+* Task cancellation blocks the task canceller. Interrupt after cancel 
via
+* cancellation watch dog.
+*/
+   @Test
+   public void testWatchDogInterruptsTask() throws Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = createTask(InvokableBlockingInCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
+
+   // No fatal error
+   for (Object msg : taskManagerMessages) {
+   assertEquals(false, msg instanceof 
TaskManagerMessages.FatalError);
+   }
+   }
+
+   /**
+* The invoke() method holds a lock (trigger awaitLatch after 
acquisition)
+* and cancel cannot complete because it also tries to acquire the same 
lock.
+* This is resolved by the watch dog, no fatal error.
+*/
+   @Test
+   public void testInterruptableSharedLockInInvokeAndCancel() throws 
Exception {
+   Configuration config = new Configuration();
+   config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5);
+   config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50);
+
+   Task task = 
createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
+   task.startTaskThread();
+
+   awaitLatch.await();
+
+   task.cancelExecution();
+
+   triggerLatch.await();
--- End diff --

how about adding `task.getExecutingThread().join()` instead of the using 
the trigger latch? Seems more intuitive and safer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585309#comment-15585309
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83839720
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   

[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2652#discussion_r83839720
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1251,33 +1299,124 @@ public void run() {
catch (InterruptedException e) {
// we can ignore this
}
+   }
+   catch (Throwable t) {
+   logger.error("Error in the task canceler", t);
+   }
+   }
+   }
+
+   /**
+* Watchdog for the cancellation. If the task is stuck in cancellation,
+* we notify the task manager about a fatal error.
+*/
+   private static class TaskCancellationWatchDog extends TimerTask {
+
+   /**
+* Pass logger in order to prevent that the compiler needs to 
inject static bridge methods
+* to access it.
+*/
+   private final Logger logger;
+
+   /** Thread executing the Task. */
+   private final Thread executor;
+
+   /** Interrupt interval. */
+   private final long interruptInterval;
+
+   /** Timeout after which a fatal error notification happens. */
+   private final long interruptTimeout;
+
+   /** TaskManager to notify about a timeout */
+   private final TaskManagerConnection taskManager;
+
+   /** Task name (for logging and error messages). */
+   private final String taskName;
+
+   /** Synchronization with the {@link TaskCanceler} thread. */
+   private final CountDownLatch taskCancellerLatch;
+
+   public TaskCancellationWatchDog(
+   Logger logger,
+   Thread executor,
+   long interruptInterval,
+   long interruptTimeout,
+   TaskManagerConnection taskManager,
+   String taskName,
+   CountDownLatch taskCancellerLatch) {
+
+   this.logger = checkNotNull(logger);
+   this.executor = checkNotNull(executor);
+   this.interruptInterval = 
checkNotNull(interruptInterval);
+   this.interruptTimeout = checkNotNull(interruptTimeout);
+   this.taskManager = checkNotNull(taskManager);
+   this.taskName = checkNotNull(taskName);
+   this.taskCancellerLatch = 
checkNotNull(taskCancellerLatch);
+   }
+
+   @Override
+   public void run() {
+   try {
+   // Synchronize with task canceler
+   taskCancellerLatch.await();
+   } catch (Exception e) {
+   String msg = String.format("Exception while 
waiting on task " +
+   "canceller to cancel task 
'%s'.", taskName);
+   taskManager.notifyFatalError(msg, e);
+   return;
+   }
+
+   long intervalNanos = 
TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS);
+   long timeoutNanos = 
TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS);
+   long deadline = System.nanoTime() + timeoutNanos;
+
+   try {
+   // Initial wait before interrupting periodically
+   Thread.sleep(interruptInterval);
+   } catch (InterruptedException ignored) {
+   }
+
+   // It is possible that the user code does not react to 
the task canceller.
+   // for that reason, we spawn this separate thread that 
repeatedly interrupts
+   // the user code until it exits. If the suer user code 
does not exit within
+   // the timeout, we notify the job manager about a fatal 
error.
+   while (executor.isAlive()) {
+   long now = System.nanoTime();
+
+   // build the stack trace of where the thread is 
stuck, for the log
+   StringBuilder bld = new StringBuilder();
+   StackTraceElement[] stack = 
executor.getStackTrace();
+   for (StackTraceElement e : stack) {
+   bld.append(e).append('\n');
+   }
 
- 

[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585283#comment-15585283
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Should we have one class `TaskManagerOptions`? To not spread the config 
over too many classes.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2652
  
Should we have one class `TaskManagerOptions`? To not spread the config 
over too many classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585275#comment-15585275
 ] 

ASF GitHub Bot commented on FLINK-4715:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Thanks for the valuable feedback. Some of the errors were a little sloppy 
on my side. Sorry for that. I addressed all your comments.


> TaskManager should commit suicide after cancellation failure
> 
>
> Key: FLINK-4715
> URL: https://issues.apache.org/jira/browse/FLINK-4715
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> In case of a failed cancellation, e.g. the task cannot be cancelled after a 
> given time, the {{TaskManager}} should kill itself. That way we guarantee 
> that there is no resource leak. 
> This behaviour acts as a safety-net against faulty user code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...

2016-10-18 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2652
  
Thanks for the valuable feedback. Some of the errors were a little sloppy 
on my side. Sorry for that. I addressed all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-18 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML.





  was:
It seems that SVM has predict operation for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkMl.




> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>
> It seems that SVM has predict operation for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...

2016-10-18 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2642
  
Could you please explain why are the first and last commit 
(65b3ff78f100ed1b13ec2fcc727f4869823b5918 and 
ba4f429043f3985db5ab37a3902a6fee15a7440e) necessary? Are they related to 
FLINK-3999?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >