[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931036#comment-15931036
 ] 

ASF GitHub Bot commented on FLINK-6097:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106772134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

The order is really depending on how we extract fields from all kinds of 
expressions. Like `BinaryExpression`, we first extract `left child`, and then 
`right child`. And for `Funtion Calls`, we extract the field from parameter 
with left to right order. More complex example will be `over`, imagine there is 
an aggregate on a partitioned window. Should the fields appeared in the 
aggregate or the field which partitioned on should be considered first? 

So i think this kind of order is hard to define and hard to stay 
consistency, it will change easily when we modifying the codes. We should not 
rely anything on this. 


> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When we try to implement `OVER window` TableAPI, The first version of the 
> prototype to achieve,we do not consider the table field will be out of order 
> when we implement `translateToPlan` method,then we  set `outputRow` field 
> from `inputRow` according to the Initial order of the table field index.
> At the beginning, the projections in the select statement less than 5 columns 
> It works well.But Unfortunately when the count of projections bigger than 4 
> (>=5), we got the random result. Then we debug the code, we find that  
> `ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
> temporary save field, when the number of elements in the Set is less than 5, 
> the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
> elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet 
> and which will cause the data to be out of order.  
> e.g.:
> Add the following elements in turn:
> {code}
> A, b, c, d, e
> Set (a)
> Class scala.collection.immutable.Set $ Set1
> Set (a, b)
> Class scala.collection.immutable.Set $ Set2
> Set (a, b, c)
> Class scala.collection.immutable.Set $ Set3
> Set (a, b, c, d)
> Class scala.collection.immutable.Set $ Set4
> // we want (a, b, c, d, e)
> Set (e, a, b, c, d) 
> Class scala.collection.immutable.HashSet $ HashTrieSet
> {code}
> So we thought 2 approach to solve this problem:
> 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the 
> order of the extracted field references same as input order.
> 2. We add the input and output field mapping. 
> At last we using approach#2 solve the problem. This change is not necessary 
> for the problem i have faced. But I feel it is better to let the output of 
> this method in the same order as the input, it may be very helpful for other 
> cases, though I am currently not aware of any. I am ok with not making this 
> change, but we should add a comment instead to highlight that the potential 
> output of the current output. Otherwise, some people may not pay attention to 
> this and assume it is in order.
> Hi, guys, What do you think? Welcome any feedback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...

2017-03-17 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106772134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

The order is really depending on how we extract fields from all kinds of 
expressions. Like `BinaryExpression`, we first extract `left child`, and then 
`right child`. And for `Funtion Calls`, we extract the field from parameter 
with left to right order. More complex example will be `over`, imagine there is 
an aggregate on a partitioned window. Should the fields appeared in the 
aggregate or the field which partitioned on should be considered first? 

So i think this kind of order is hard to define and hard to stay 
consistency, it will change easily when we modifying the codes. We should not 
rely anything on this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6106) Blob Server doesn't delete tmp fold when exit in HA mode.

2017-03-17 Thread Syinchwun Leo (JIRA)
Syinchwun Leo created FLINK-6106:


 Summary: Blob Server doesn't delete tmp fold when exit in HA mode.
 Key: FLINK-6106
 URL: https://issues.apache.org/jira/browse/FLINK-6106
 Project: Flink
  Issue Type: Wish
  Components: JobManager
Affects Versions: 1.2.0
Reporter: Syinchwun Leo


When start in HA mode, Blob server does not put itself in shutdown hook. Line 
158-164 
if (highAvailabilityMode == HighAvailabilityMode.NONE) {
// Add shutdown hook to delete storage directory
this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
else {
this.shutdownHook = null;
}

That means, when kill the application in YARN, tmp fold will not be deleted.  
What the propose of this design?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931016#comment-15931016
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
Right...I have same thought as you at the beginning and i've tried to make 
the move atomic but it has serveral side affect, like:
1. if we use this way to handle this, which means two job can share the 
same jar file in blobserver, it will be a problem when one of them being 
canceled and deleting its jars(now it seems like it doesn't do the delete, but 
it should do)
2. for job recovery(or other kind of recovery, i'm not sure, just observed 
the phenomenon) blob server will upload jars to hdfs using same name of local 
file. Even the two jobs share same jar in blob store, they will upload it twice 
at same time, which will cause file lease occuptation in hdfs.


> Blob Server cannot hanlde multiple job sumits(with same content) parallelly
> ---
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-03-17 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
Right...I have same thought as you at the beginning and i've tried to make 
the move atomic but it has serveral side affect, like:
1. if we use this way to handle this, which means two job can share the 
same jar file in blobserver, it will be a problem when one of them being 
canceled and deleting its jars(now it seems like it doesn't do the delete, but 
it should do)
2. for job recovery(or other kind of recovery, i'm not sure, just observed 
the phenomenon) blob server will upload jars to hdfs using same name of local 
file. Even the two jobs share same jar in blob store, they will upload it twice 
at same time, which will cause file lease occuptation in hdfs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930960#comment-15930960
 ] 

ASF GitHub Bot commented on FLINK-6097:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3560
  
HI, @KurtYoung Thanks for your attention to this PR. Good question, Here I 
glad share why I notice this method:
When we try to implement OVER window TableAPI, The first version of the 
prototype to achieve,we do not consider the table field will be out of order 
when we implement translateToPlan method,then we set outputRow field from 
inputRow according to the Initial order of the table field index.
At the beginning, the projections in the select statement less than 5 
columns It works well.But Unfortunately when the count of projections bigger 
than 4 (>=5), we got the random result. Then we debug the code, we find that 
ProjectionTranslator # identifyFieldReferences method uses theSet temporary 
save field, when the number of elements in the Set is less than 5, the Set 
takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is 
greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will 
cause the data to be out of order. So we thought 2 approach to solve this 
problem:

Let ProjectionTranslator # identifyFieldReferences method guaranteed the 
order of the extracted field references same as input order.
We add the input and output field mapping.
At last we using approach#2 solve the problem. This change is not necessary 
for the problem i have faced. But I feel it is better to let the output of this 
method in the same order as the input, it may be very helpful for other cases, 
though I am currently not aware of any. I am ok with not making this change, 
but we should add a comment instead to highlight that the potential output of 
the current output. Otherwise, some people may not pay attention to this and 
assume it is in order.

Thanks,
SunJincheng


> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When we try to implement `OVER window` TableAPI, The first version of the 
> prototype to achieve,we do not consider the table field will be out of order 
> when we implement `translateToPlan` method,then we  set `outputRow` field 
> from `inputRow` according to the Initial order of the table field index.
> At the beginning, the projections in the select statement less than 5 columns 
> It works well.But Unfortunately when the count of projections bigger than 4 
> (>=5), we got the random result. Then we debug the code, we find that  
> `ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
> temporary save field, when the number of elements in the Set is less than 5, 
> the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
> elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet 
> and which will cause the data to be out of order.  
> e.g.:
> Add the following elements in turn:
> {code}
> A, b, c, d, e
> Set (a)
> Class scala.collection.immutable.Set $ Set1
> Set (a, b)
> Class scala.collection.immutable.Set $ Set2
> Set (a, b, c)
> Class scala.collection.immutable.Set $ Set3
> Set (a, b, c, d)
> Class scala.collection.immutable.Set $ Set4
> // we want (a, b, c, d, e)
> Set (e, a, b, c, d) 
> Class scala.collection.immutable.HashSet $ HashTrieSet
> {code}
> So we thought 2 approach to solve this problem:
> 1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the 
> order of the extracted field references same as input order.
> 2. We add the input and output field mapping. 
> At last we using approach#2 solve the problem. This change is not necessary 
> for the problem i have faced. But I feel it is better to let the output of 
> this method in the same order as the input, it may be very helpful for other 
> cases, though I am currently not aware of any. I am ok with not making this 
> change, but we should add a comment instead to highlight that the potential 
> output of the current output. Otherwise, some people may not pay attention to 
> this and assume it is in order.
> Hi, guys, What do you think? Welcome any feedback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3560: [FLINK-6097][table] Guaranteed the order of the extracted...

2017-03-17 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3560
  
HI, @KurtYoung Thanks for your attention to this PR. Good question, Here I 
glad share why I notice this method:
When we try to implement OVER window TableAPI, The first version of the 
prototype to achieve,we do not consider the table field will be out of order 
when we implement translateToPlan method,then we set outputRow field from 
inputRow according to the Initial order of the table field index.
At the beginning, the projections in the select statement less than 5 
columns It works well.But Unfortunately when the count of projections bigger 
than 4 (>=5), we got the random result. Then we debug the code, we find that 
ProjectionTranslator # identifyFieldReferences method uses theSet temporary 
save field, when the number of elements in the Set is less than 5, the Set 
takes the Se1, Se2, Se3, Se4 data structures. When the number of elements is 
greater than or equal to 5, the Set takes HashSet # HashTrieSet and which will 
cause the data to be out of order. So we thought 2 approach to solve this 
problem:

Let ProjectionTranslator # identifyFieldReferences method guaranteed the 
order of the extracted field references same as input order.
We add the input and output field mapping.
At last we using approach#2 solve the problem. This change is not necessary 
for the problem i have faced. But I feel it is better to let the output of this 
method in the same order as the input, it may be very helpful for other cases, 
though I am currently not aware of any. I am ok with not making this change, 
but we should add a comment instead to highlight that the potential output of 
the current output. Otherwise, some people may not pay attention to this and 
assume it is in order.

Thanks,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6097:
---
Description: 
When we try to implement `OVER window` TableAPI, The first version of the 
prototype to achieve,we do not consider the table field will be out of order 
when we implement `translateToPlan` method,then we  set `outputRow` field from 
`inputRow` according to the Initial order of the table field index.
At the beginning, the projections in the select statement less than 5 columns 
It works well.But Unfortunately when the count of projections bigger than 4 
(>=5), we got the random result. Then we debug the code, we find that  
`ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
temporary save field, when the number of elements in the Set is less than 5, 
the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and 
which will cause the data to be out of order.  
e.g.:
Add the following elements in turn:
{code}
A, b, c, d, e
Set (a)
Class scala.collection.immutable.Set $ Set1
Set (a, b)
Class scala.collection.immutable.Set $ Set2
Set (a, b, c)
Class scala.collection.immutable.Set $ Set3
Set (a, b, c, d)
Class scala.collection.immutable.Set $ Set4
// we want (a, b, c, d, e)
Set (e, a, b, c, d) 
Class scala.collection.immutable.HashSet $ HashTrieSet
{code}

So we thought 2 approach to solve this problem:

1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the 
order of the extracted field references same as input order.
2. We add the input and output field mapping. 

At last we using approach#2 solve the problem. This change is not necessary for 
the problem i have faced. But I feel it is better to let the output of this 
method in the same order as the input, it may be very helpful for other cases, 
though I am currently not aware of any. I am ok with not making this change, 
but we should add a comment instead to highlight that the potential output of 
the current output. Otherwise, some people may not pay attention to this and 
assume it is in order.
Hi, guys, What do you think? Welcome any feedback.


  was:
When we try to implement `OVER window` TableAPI, The first version of the 
prototype to achieve,we do not consider the table field will be out of order 
when we implement `translateToPlan` method,then we  set `outputRow` field from 
`inputRow` according to the Initial order of the table field index.
At the beginning, the projections in the select statement less than 5 columns 
It works well.But Unfortunately when the count of projections bigger than 4 
(>=5), we got the random result. Then we debug the code, we find that  
`ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
temporary save field, when the number of elements in the Set is less than 5, 
the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and 
which will cause the data to be out of order.  
e.g.:
Add the following elements in turn:
{code}
A, b, c, d, e
Set (a)
Class scala.collection.immutable.Set $ Set1
Set (a, b)
Class scala.collection.immutable.Set $ Set2
Set (a, b, c)
Class scala.collection.immutable.Set $ Set3
Set (a, b, c, d)
Class scala.collection.immutable.Set $ Set4
// we want (a, b, c, d, e)
Set (e, a, b, c, d) 
Class scala.collection.immutable.HashSet $ HashTrieSet
{code}

So we thought 2 approach to solve this problem:

1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the 
order of the extracted field references same as input order.
2. We add the input and output field mapping. 

At last we using approach#2 solve the problem. This change is not necessary for 
the problem i have faced. But I feel it is better to let the output of this 
method in the same order as the input, it may be very helpful for other cases, 
though I am currently not aware of any. I am ok with not making this change, 
but we should add a comment instead to highlight that the potential output of 
the current output. Otherwise, some people may not pay attention to this and 
assume it is in order, like me.
Hi, guys, What do you think? Welcome any feedback.



> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When we try to implement `OVER window` TableAPI, The first version of the 
> prototype to achieve,we do not consider the table field will be out of order 
> when we implement `translateToPlan` method,then we  set `outputRow` field 
> 

[jira] [Updated] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6097:
---
Description: 
When we try to implement `OVER window` TableAPI, The first version of the 
prototype to achieve,we do not consider the table field will be out of order 
when we implement `translateToPlan` method,then we  set `outputRow` field from 
`inputRow` according to the Initial order of the table field index.
At the beginning, the projections in the select statement less than 5 columns 
It works well.But Unfortunately when the count of projections bigger than 4 
(>=5), we got the random result. Then we debug the code, we find that  
`ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
temporary save field, when the number of elements in the Set is less than 5, 
the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and 
which will cause the data to be out of order.  
e.g.:
Add the following elements in turn:
{code}
A, b, c, d, e
Set (a)
Class scala.collection.immutable.Set $ Set1
Set (a, b)
Class scala.collection.immutable.Set $ Set2
Set (a, b, c)
Class scala.collection.immutable.Set $ Set3
Set (a, b, c, d)
Class scala.collection.immutable.Set $ Set4
// we want (a, b, c, d, e)
Set (e, a, b, c, d) 
Class scala.collection.immutable.HashSet $ HashTrieSet
{code}

So we thought 2 approach to solve this problem:

1. Let `ProjectionTranslator # identifyFieldReferences` method guaranteed the 
order of the extracted field references same as input order.
2. We add the input and output field mapping. 

At last we using approach#2 solve the problem. This change is not necessary for 
the problem i have faced. But I feel it is better to let the output of this 
method in the same order as the input, it may be very helpful for other cases, 
though I am currently not aware of any. I am ok with not making this change, 
but we should add a comment instead to highlight that the potential output of 
the current output. Otherwise, some people may not pay attention to this and 
assume it is in order, like me.
Hi, guys, What do you think? Welcome any feedback.


  was:
The current `ProjectionTranslator # identifyFieldReferences` method uses the` 
Set` temporary save field, when the number of elements in the Set is less than 
5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet and 
which will cause the data to be out of order. although the out of order is also 
working, but I think the order is better than out of order. So I want to 
improve it,Orderly extraction field.i.e.Guaranteed the order of the extracted 
field references as input order.
e.g.:
Add the following elements in turn:
{code}
A, b, c, d, e
Set (a)
Class scala.collection.immutable.Set $ Set1
Set (a, b)
Class scala.collection.immutable.Set $ Set2
Set (a, b, c)
Class scala.collection.immutable.Set $ Set3
Set (a, b, c, d)
Class scala.collection.immutable.Set $ Set4

Set (e, a, b, c, d) -> I want (a, b, c, d, e)
Class scala.collection.immutable.HashSet $ HashTrieSet
{code}


> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When we try to implement `OVER window` TableAPI, The first version of the 
> prototype to achieve,we do not consider the table field will be out of order 
> when we implement `translateToPlan` method,then we  set `outputRow` field 
> from `inputRow` according to the Initial order of the table field index.
> At the beginning, the projections in the select statement less than 5 columns 
> It works well.But Unfortunately when the count of projections bigger than 4 
> (>=5), we got the random result. Then we debug the code, we find that  
> `ProjectionTranslator # identifyFieldReferences` method uses the` Set` 
> temporary save field, when the number of elements in the Set is less than 5, 
> the Set takes the Se1, Se2, Se3, Se4 data structures. When the number of 
> elements is greater than or equal to 5, the Set takes HashSet # HashTrieSet 
> and which will cause the data to be out of order.  
> e.g.:
> Add the following elements in turn:
> {code}
> A, b, c, d, e
> Set (a)
> Class scala.collection.immutable.Set $ Set1
> Set (a, b)
> Class scala.collection.immutable.Set $ Set2
> Set (a, b, c)
> Class scala.collection.immutable.Set $ Set3
> Set (a, b, c, d)
> Class scala.collection.immutable.Set $ Set4
> // we want (a, b, c, d, e)
> Set (e, a, b, c, d) 
> Class scala.collection.immutable.HashSet $ 

[GitHub] flink issue #3560: [FLINK-6097][table] Guaranteed the order of the extracted...

2017-03-17 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3560
  
HI, @KurtYoung Thanks for your attention to this PR. Good question, the 
purpose of this change is just like this JIRA's description:
I want `ProjectionTranslator # identifyFieldReferences`  to guaranteed the 
order of the extracted field references same as input order. using `Set` just 
want to eliminate duplicate field references, and `List` also can do this, it 
is no harm to keep the `ProjectionTranslator # identifyFieldReferences` 
method's output order consistent with input.
If the disorder results of  `ProjectionTranslator # 
identifyFieldReferences` can work, then the orderly results must also work very 
well, because the order is also a  situation of disorder. 
What to you think?

Thanks,
SunJincheng




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930890#comment-15930890
 ] 

ASF GitHub Bot commented on FLINK-6097:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3560
  
HI, @KurtYoung Thanks for your attention to this PR. Good question, the 
purpose of this change is just like this JIRA's description:
I want `ProjectionTranslator # identifyFieldReferences`  to guaranteed the 
order of the extracted field references same as input order. using `Set` just 
want to eliminate duplicate field references, and `List` also can do this, it 
is no harm to keep the `ProjectionTranslator # identifyFieldReferences` 
method's output order consistent with input.
If the disorder results of  `ProjectionTranslator # 
identifyFieldReferences` can work, then the orderly results must also work very 
well, because the order is also a  situation of disorder. 
What to you think?

Thanks,
SunJincheng




> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The current `ProjectionTranslator # identifyFieldReferences` method uses the` 
> Set` temporary save field, when the number of elements in the Set is less 
> than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number 
> of elements is greater than or equal to 5, the Set takes HashSet # 
> HashTrieSet and which will cause the data to be out of order. although the 
> out of order is also working, but I think the order is better than out of 
> order. So I want to improve it,Orderly extraction field.i.e.Guaranteed the 
> order of the extracted field references as input order.
> e.g.:
> Add the following elements in turn:
> {code}
> A, b, c, d, e
> Set (a)
> Class scala.collection.immutable.Set $ Set1
> Set (a, b)
> Class scala.collection.immutable.Set $ Set2
> Set (a, b, c)
> Class scala.collection.immutable.Set $ Set3
> Set (a, b, c, d)
> Class scala.collection.immutable.Set $ Set4
> Set (e, a, b, c, d) -> I want (a, b, c, d, e)
> Class scala.collection.immutable.HashSet $ HashTrieSet
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6097) Guaranteed the order of the extracted field references

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930874#comment-15930874
 ] 

ASF GitHub Bot commented on FLINK-6097:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106761581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

Can you explain more about ` Especially the order is not defined very well, 
it will change easily if we modify the code.`?


> Guaranteed the order of the extracted field references
> --
>
> Key: FLINK-6097
> URL: https://issues.apache.org/jira/browse/FLINK-6097
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The current `ProjectionTranslator # identifyFieldReferences` method uses the` 
> Set` temporary save field, when the number of elements in the Set is less 
> than 5, the Set takes the Se1, Se2, Se3, Se4 data structures. When the number 
> of elements is greater than or equal to 5, the Set takes HashSet # 
> HashTrieSet and which will cause the data to be out of order. although the 
> out of order is also working, but I think the order is better than out of 
> order. So I want to improve it,Orderly extraction field.i.e.Guaranteed the 
> order of the extracted field references as input order.
> e.g.:
> Add the following elements in turn:
> {code}
> A, b, c, d, e
> Set (a)
> Class scala.collection.immutable.Set $ Set1
> Set (a, b)
> Class scala.collection.immutable.Set $ Set2
> Set (a, b, c)
> Class scala.collection.immutable.Set $ Set3
> Set (a, b, c, d)
> Class scala.collection.immutable.Set $ Set4
> Set (e, a, b, c, d) -> I want (a, b, c, d, e)
> Class scala.collection.immutable.HashSet $ HashTrieSet
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3560: [FLINK-6097][table] Guaranteed the order of the ex...

2017-03-17 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3560#discussion_r106761581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -227,18 +227,23 @@ object ProjectionTranslator {
 * @param exprs a list of expressions to extract
 * @return a list of field references extracted from the given 
expressions
 */
-  def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] 
= {
-exprs.foldLeft(Set[NamedExpression]()) {
+  def extractFieldReferences(exprs: Seq[Expression]): 
List[NamedExpression] = {
+exprs.foldLeft(List[NamedExpression]()) {
   (fieldReferences, expr) => identifyFieldReferences(expr, 
fieldReferences)
-}.toSeq
+}
   }
 
   private def identifyFieldReferences(
   expr: Expression,
-  fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr 
match {
+  fieldReferences: List[NamedExpression]): List[NamedExpression] = 
expr match {
--- End diff --

Can you explain more about ` Especially the order is not defined very well, 
it will change easily if we modify the code.`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930812#comment-15930812
 ] 

ASF GitHub Bot commented on FLINK-5808:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/3509


> Missing verification for setParallelism and setMaxParallelism
> -
>
> Key: FLINK-5808
> URL: https://issues.apache.org/jira/browse/FLINK-5808
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> When {{setParallelism()}} is called we don't verify that it is <= than max 
> parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check 
> that the new value doesn't clash with a previously set parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3509: [FLINK-5808] Fix Missing verification for setParal...

2017-03-17 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/3509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930495#comment-15930495
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I think we should then fix this in the blob server.

The problem that only one should succeed upon collision should be fixable 
by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store 
the file in the blob store.

What do you think?


> Blob Server cannot hanlde multiple job sumits(with same content) parallelly
> ---
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3525
  
I think we should then fix this in the blob server.

The problem that only one should succeed upon collision should be fixable 
by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store 
the file in the blob store.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106717641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

How about calling it `ROW` and `ROW_NAMED` or so? I think just adding 
another parameter is hacky...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930439#comment-15930439
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106717641
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

How about calling it `ROW` and `ROW_NAMED` or so? I think just adding 
another parameter is hacky...


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
One more question: Can the StateRegistry not directly drop states that have 
no reference any more when states are unregistered? Is there a special reason 
for first collecting these states in a list, then getting them and then 
dropping them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6014) Allow the registration of state objects in checkpoints

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930426#comment-15930426
 ] 

ASF GitHub Bot commented on FLINK-6014:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
One more question: Can the StateRegistry not directly drop states that have 
no reference any more when states are unregistered? Is there a special reason 
for first collecting these states in a list, then getting them and then 
dropping them?


> Allow the registration of state objects in checkpoints
> --
>
> Key: FLINK-6014
> URL: https://issues.apache.org/jira/browse/FLINK-6014
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> This issue is the very first step towards incremental checkpointing. We 
> introduce a new state handle named {{CompositeStateHandle}} to be the base of 
> the snapshots taken by task components.  Known implementation may include 
> {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for 
> subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s).
> Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. 
> It should register all its state objects in {{StateRegistry}} when its 
> checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending 
> checkpoint completes or a complete checkpoint is reloaded in the recovery). 
> When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, 
> we should not simply discard all state objects in the checkpoint. With the 
> introduction of incremental checkpointing, a {{StateObject}} may be 
> referenced by different checkpoints. We should unregister all the state 
> objects contained in the {{StateRegistry}} first. Only those state objects 
> that are not referenced by any checkpoint can be deleted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6014) Allow the registration of state objects in checkpoints

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930423#comment-15930423
 ] 

ASF GitHub Bot commented on FLINK-6014:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Thanks for opening this pull request. Adding a `CompositeStateHandle` and a 
`StateRegistry` is a good idea.

Some thoughts:

  - What do you think about making the `StateRegistry` into a 
`SharedStateRegistry` which only contains the handles to state that is shared 
across checkpoints? State that is exclusive to a checkpoint is not handled by 
that registry, but remains only in the checkpoint. That way we "isolate" the 
existing behavior against the coming changes and do not risk regressions in the 
state cleanup code (which is very critical for current users).

  - Another reason for the above suggestion is to also bring some other 
code into place that has some "fast paths" and "safety nets" for checkpoint 
cleanups (currently only with non-shared state), for example dropping a 
checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 
). We have seen that for various users the state cleanup problems are among the 
biggest problems they have, which we can address very well with the work 
started in the above linked pull request. These things would work together 
seamlessly if the registry deals only with shared state handles.

  - I am wondering if it is easier to put the registry into the checkpoint 
coordinator rather than the checkpoint stores. That way we need the code that 
deals with adding / failure handling / etc only once.



> Allow the registration of state objects in checkpoints
> --
>
> Key: FLINK-6014
> URL: https://issues.apache.org/jira/browse/FLINK-6014
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> This issue is the very first step towards incremental checkpointing. We 
> introduce a new state handle named {{CompositeStateHandle}} to be the base of 
> the snapshots taken by task components.  Known implementation may include 
> {{KeyedStateHandle}} (for {{KeyedStateBackend}}s), {{SubtaskState}} (for 
> subtasks, splits of {{JobVertex}}) and {{TaskState}} (for {{JobVertex}}s).
> Each {{CompositeStateHandle}} is composed of a collection of {{StateObject}s. 
> It should register all its state objects in {{StateRegistry}} when its 
> checkpoint is added into {{CompletedCheckpointStore}} (i.e., a pending 
> checkpoint completes or a complete checkpoint is reloaded in the recovery). 
> When a completed checkpoint is moved out of the {{CompletedCheckpointStore}}, 
> we should not simply discard all state objects in the checkpoint. With the 
> introduction of incremental checkpointing, a {{StateObject}} may be 
> referenced by different checkpoints. We should unregister all the state 
> objects contained in the {{StateRegistry}} first. Only those state objects 
> that are not referenced by any checkpoint can be deleted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3524: [FLINK-6014][checkpoint] Allow the registration of state ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3524
  
Thanks for opening this pull request. Adding a `CompositeStateHandle` and a 
`StateRegistry` is a good idea.

Some thoughts:

  - What do you think about making the `StateRegistry` into a 
`SharedStateRegistry` which only contains the handles to state that is shared 
across checkpoints? State that is exclusive to a checkpoint is not handled by 
that registry, but remains only in the checkpoint. That way we "isolate" the 
existing behavior against the coming changes and do not risk regressions in the 
state cleanup code (which is very critical for current users).

  - Another reason for the above suggestion is to also bring some other 
code into place that has some "fast paths" and "safety nets" for checkpoint 
cleanups (currently only with non-shared state), for example dropping a 
checkpoint simply by a `rm -r` (see https://github.com/apache/flink/pull/3522 
). We have seen that for various users the state cleanup problems are among the 
biggest problems they have, which we can address very well with the work 
started in the above linked pull request. These things would work together 
seamlessly if the registry deals only with shared state handles.

  - I am wondering if it is easier to put the registry into the checkpoint 
coordinator rather than the checkpoint stores. That way we need the code that 
deals with adding / failure handling / etc only once.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2029: [FLINK-2814] Fix for DualInputPlanNode cannot be cast to ...

2017-03-17 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2029
  
I created #3563 which combines this PR and my suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3550: [FLINK-5654] - Add processing time OVER RANGE BETWEEN x P...

2017-03-17 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3550
  
@fhueske - Thank you very much for your review. It was very useful. I 
integrated and addressed most of the remarks you made.
What remains to be discussed and it is not addressed is:
-using the processingFunction vs keeping the Window-based implementation (i 
prefer and believe is more appropriate here later as mentioned)
-doing the removal of the merge commit...as per my previous comment i do 
not  know exactly what to do about that



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6050) Improve failure reporting when using Future.thenAccept

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930381#comment-15930381
 ] 

ASF GitHub Bot commented on FLINK-6050:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3537
  
I think this is good, +1

Do we have a test that validates that completing a `Future` exceptionally 
also completes all result Futures of `thenApply` (or `thenApplyAsync`) 
functions with an exception?


> Improve failure reporting when using Future.thenAccept
> --
>
> Key: FLINK-6050
> URL: https://issues.apache.org/jira/browse/FLINK-6050
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should 
> register the exception handler on the returned {{Future}} and not on 
> the original future. This has the advantage that we also catch exceptions 
> which are thrown in the {{AcceptFunction}} and not only those originating 
> from the original {{Future}}. This improve Flink's behaviour, because 
> exceptions are not swallowed in the returned {{Future}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3537: [FLINK-6050] [robustness] Register exception handler on t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3537
  
I think this is good, +1

Do we have a test that validates that completing a `Future` exceptionally 
also completes all result Futures of `thenApply` (or `thenApplyAsync`) 
functions with an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930377#comment-15930377
 ] 

ASF GitHub Bot commented on FLINK-2814:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2029
  
I created #3563 which combines this PR and my suggestion.


> DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
> ---
>
> Key: FLINK-2814
> URL: https://issues.apache.org/jira/browse/FLINK-2814
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Rekha Joshi
>
> A delta iteration that closes with a solution set which is a {{JoinOperator}} 
> throws the following exception:
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:289)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to 
> org.apache.flink.optimizer.plan.SingleInputPlanNode
>   at 
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>   at 
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:350)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:424)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1365)
>   at Driver.main(Driver.java:366)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429)
>   ... 6 more
> {noformat}
> Temporary fix is to attach an identity mapper.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
Actually, do you think you could add a test for this? Would be good to 
guard that for the future...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930372#comment-15930372
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/3550
  
@fhueske - Thank you very much for your review. It was very useful. I 
integrated and addressed most of the remarks you made.
What remains to be discussed and it is not addressed is:
-using the processingFunction vs keeping the Window-based implementation (i 
prefer and believe is more appropriate here later as mentioned)
-doing the removal of the merge commit...as per my previous comment i do 
not  know exactly what to do about that



> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930371#comment-15930371
 ] 

ASF GitHub Bot commented on FLINK-2814:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3563

[FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode

WorksetIterationNode#instantiate loops over all solution and work set 
candidates. Since the solution set reference is modified in place when the 
predecessor node can be used in its place, swith this variable to the inner 
loop.

@StephanEwen this is similar to #2029 but resets the reference in the loop. 
I believe my prior suggestion to immediately return upon adding a node was 
incorrect as the `instantiate` methods look to be compiling all valid 
combinations.

IntelliJ code coverage on `flink-optimizer` shows 105 hits through 
`WorksetIterationNode#instantiate` and it does fix this issue with my Katz 
Centrality algorithm (which should not be using delta iterations, but I was 
young and naive when I wrote it).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
2814_deltaiteration_dualinputplannode_cannot_be_cast_to_singleinputplannode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3563


commit 34f017834e17fa69e2b7c72bd95e1a819e4e6aa3
Author: Greg Hogan 
Date:   2017-03-17T16:09:34Z

[FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode

WorksetIterationNode#instantiate loops over all solution and work set
candidates. Since the solution set reference is modified in place when
the predecessor node can be used in its place, swith this variable to
the inner loop.




> DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
> ---
>
> Key: FLINK-2814
> URL: https://issues.apache.org/jira/browse/FLINK-2814
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Rekha Joshi
>
> A delta iteration that closes with a solution set which is a {{JoinOperator}} 
> throws the following exception:
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:289)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to 
> org.apache.flink.optimizer.plan.SingleInputPlanNode
>   at 
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>   at 
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>   at 
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271)
>   at 
> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:350)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:424)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1365)
>   at Driver.main(Driver.java:366)
>   at 

[GitHub] flink pull request #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot ...

2017-03-17 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/3563

[FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode

WorksetIterationNode#instantiate loops over all solution and work set 
candidates. Since the solution set reference is modified in place when the 
predecessor node can be used in its place, swith this variable to the inner 
loop.

@StephanEwen this is similar to #2029 but resets the reference in the loop. 
I believe my prior suggestion to immediately return upon adding a node was 
incorrect as the `instantiate` methods look to be compiling all valid 
combinations.

IntelliJ code coverage on `flink-optimizer` shows 105 hits through 
`WorksetIterationNode#instantiate` and it does fix this issue with my Katz 
Centrality algorithm (which should not be using delta iterations, but I was 
young and naive when I wrote it).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
2814_deltaiteration_dualinputplannode_cannot_be_cast_to_singleinputplannode

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3563


commit 34f017834e17fa69e2b7c72bd95e1a819e4e6aa3
Author: Greg Hogan 
Date:   2017-03-17T16:09:34Z

[FLINK-2814] [optimizer] DualInputPlanNode cannot be cast to 
SingleInputPlanNode

WorksetIterationNode#instantiate loops over all solution and work set
candidates. Since the solution set reference is modified in place when
the predecessor node can be used in its place, swith this variable to
the inner loop.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3526: [FLINK-5999] [resMgnr] Move JobLeaderIdService shut down ...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3526
  
Looks good to me, +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5999) MiniClusterITCase.runJobWithMultipleRpcServices fails

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930363#comment-15930363
 ] 

ASF GitHub Bot commented on FLINK-5999:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3526
  
Looks good to me, +1 to merge


> MiniClusterITCase.runJobWithMultipleRpcServices fails
> -
>
> Key: FLINK-5999
> URL: https://issues.apache.org/jira/browse/FLINK-5999
> Project: Flink
>  Issue Type: Test
>  Components: Distributed Coordination, Tests
>Reporter: Ufuk Celebi
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> In a branch with unrelated changes to the web frontend I've seen the 
> following test fail:
> {code}
> runJobWithMultipleRpcServices(org.apache.flink.runtime.minicluster.MiniClusterITCase)
>   Time elapsed: 1.145 sec  <<< ERROR!
> java.util.ConcurrentModificationException: null
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.clear(JobLeaderIdService.java:114)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.stop(JobLeaderIdService.java:92)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.shutDown(ResourceManager.java:182)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDownInternally(ResourceManagerRunner.java:83)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDown(ResourceManagerRunner.java:78)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:313)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:281)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterITCase.runJobWithMultipleRpcServices(MiniClusterITCase.java:72)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930360#comment-15930360
 ] 

ASF GitHub Bot commented on FLINK-6027:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
Actually, do you think you could add a test for this? Would be good to 
guard that for the future...


> Ignore the exception thrown by the subsuming of old completed checkpoints
> -
>
> Key: FLINK-6027
> URL: https://issues.apache.org/jira/browse/FLINK-6027
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> When a checkpoint is added into the {{CompletedCheckpointStore}} via the 
> method {{addCheckpoint()}}, the oldest checkpoints will be removed from the 
> store if the number of stored checkpoints exceeds the given limit. The 
> subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw 
> exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states 
> in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because 
> the new checkpoint is still in the store, we may recover the job from the new 
> checkpoint. But the recovery will fail as the states of the checkpoint are 
> all deleted.
> We should ignore the exceptions thrown by the subsuming of old checkpoints 
> because we can always recover from the new checkpoint when successfully 
> adding it into the store. The ignorance may produce some dirty data, but it's 
> acceptable because they can be cleaned with the cleanup hook introduced in 
> the near future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue

2017-03-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5376:
--
Description: 
The following are two examples where ordered stream element queue is mentioned:

{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.

  was:
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.


> Misleading log statements in UnorderedStreamElementQueue
> 
>
> Key: FLINK-5376
> URL: https://issues.apache.org/jira/browse/FLINK-5376
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> The following are two examples where ordered stream element queue is 
> mentioned:
> {code}
> LOG.debug("Put element into ordered stream element queue. New filling 
> degree " +
>   "({}/{}).", numberEntries, capacity);
> return true;
>   } else {
> LOG.debug("Failed to put element into ordered stream element queue 
> because it " +
> {code}
> I guess OrderedStreamElementQueue was coded first.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-03-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346
 ] 

Ted Yu edited comment on FLINK-5486 at 3/17/17 5:19 PM:


Lock on State.bucketStates should be held in the following method:
{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}


was (Author: yuzhih...@gmail.com):
Lock on State.bucketStates should be held in the following method:

{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-17 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-5985:
-

Assignee: Stefan Richter

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.3.0, 1.2.1
>
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-17 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-5985.
-
   Resolution: Fixed
Fix Version/s: 1.2.1
   1.3.0

fixed in 20fff32.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.3.0, 1.2.1
>
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930324#comment-15930324
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3523


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
> Fix For: 1.3.0, 1.2.1
>
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3523: [FLINK-5985] Report no task states for stateless t...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3523


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930320#comment-15930320
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3523
  
Thanks for the review @StephanEwen. I updated the test as suggested. 
Merging this now.


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks on...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3523
  
Thanks for the review @StephanEwen. I updated the test as suggested. 
Merging this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930311#comment-15930311
 ] 

ASF GitHub Bot commented on FLINK-6020:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
The second rename will not fail, but make the file which written by the 
first corrupted, which will make the first job failed if the task is loading 
this jar.

by the way, the jar file will be uploaded to hdfs for recovery, and the 
uploading will fail too if there are more than two clients writing file with 
same name.

It is easy to reoccur. First launch a session with enough slots, then run a 
script contains many same job submitting, says there are 20 lines of "flink run 
../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each 
line to make them run in parallel.


> Blob Server cannot hanlde multiple job sumits(with same content) parallelly
> ---
>
> Key: FLINK-6020
> URL: https://issues.apache.org/jira/browse/FLINK-6020
> Project: Flink
>  Issue Type: Bug
>Reporter: Tao Wang
>Assignee: Tao Wang
>Priority: Critical
>
> In yarn-cluster mode, if we submit one same job multiple times parallelly, 
> the task will encounter class load problem and lease occuputation.
> Because blob server stores user jars in name with generated sha1sum of those, 
> first writes a temp file and move it to finalialize. For recovery it also 
> will put them to HDFS with same file name.
> In same time, when multiple clients sumit same job with same jar, the local 
> jar files in blob server and those file on hdfs will be handled in multiple 
> threads(BlobServerConnection), and impact each other.
> It's better to have a way to handle this, now two ideas comes up to my head:
> 1. lock the write operation, or
> 2. use some unique identifier as file name instead of ( or added up to) 
> sha1sum of the file contents.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3525: [FLINK-6020]add a random integer suffix to blob key to av...

2017-03-17 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3525
  
The second rename will not fail, but make the file which written by the 
first corrupted, which will make the first job failed if the task is loading 
this jar.

by the way, the jar file will be uploaded to hdfs for recovery, and the 
uploading will fail too if there are more than two clients writing file with 
same name.

It is easy to reoccur. First launch a session with enough slots, then run a 
script contains many same job submitting, says there are 20 lines of "flink run 
../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each 
line to make them run in parallel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-03-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5541:
--
Description: 
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}
Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().

  was:
{code}
  if (localJar == null) {
try {
  for (final URL url : ((ContextEnvironment) 
ExecutionEnvironment.getExecutionEnvironment())
  .getJars()) {
// TODO verify that there is only one jar
localJar = new File(url.toURI()).getAbsolutePath();
  }
} catch (final URISyntaxException e) {
  // ignore
} catch (final ClassCastException e) {
  // ignore
}
  }

  logger.info("Submitting topology " + name + " in distributed mode with 
conf " + serConf);
  client.submitTopologyWithOpts(name, localJar, topology);
{code}

Since the try block may encounter URISyntaxException / ClassCastException, we 
should check that localJar is not null before calling submitTopologyWithOpts().


> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5629) Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader()

2017-03-17 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854366#comment-15854366
 ] 

Ted Yu edited comment on FLINK-5629 at 3/17/17 5:01 PM:


RandomAccessFile#length() may throw IOE.
raf is used in the following code path where DefaultFileRegion is not involved:
{code}
} else {
  lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new 
ChunkedFile(raf, 0, fileLength, 8192)),
{code}
It is good practice to close RandomAccessFile in all code paths.


was (Author: yuzhih...@gmail.com):
RandomAccessFile#length() may throw IOE.
raf is used in the following code path where DefaultFileRegion is not involved:

{code}
} else {
  lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new 
ChunkedFile(raf, 0, fileLength, 8192)),
{code}
It is good practice to close RandomAccessFile in all code paths.

> Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader()
> --
>
> Key: FLINK-5629
> URL: https://issues.apache.org/jira/browse/FLINK-5629
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> final RandomAccessFile raf;
> try {
>   raf = new RandomAccessFile(file, "r");
> ...
> long fileLength = raf.length();
> {code}
> The RandomAccessFile should be closed upon return from method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink

2017-03-17 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5855:
--
Description: 
{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}

Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().

  was:
{code}
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);

synchronized (restoredState.pendingFilesPerCheckpoint) {
  restoredState.pendingFilesPerCheckpoint.clear();
{code}
Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
handlePendingFilesForPreviousCheckpoints().


> Unprotected access to pendingFilesPerCheckpoint in BucketingSink
> 
>
> Key: FLINK-5855
> URL: https://issues.apache.org/jira/browse/FLINK-5855
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
> synchronized (restoredState.pendingFilesPerCheckpoint) {
>   restoredState.pendingFilesPerCheckpoint.clear();
> {code}
> Lock on pendingFilesPerCheckpoint should be obtained prior to the call to 
> handlePendingFilesForPreviousCheckpoints().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106685158
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Not sure what the edge id exactly does and who uses it so I prefer to not 
touch it, for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(os);
-   IOUtils.closeQuietly(is);
-  

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930241#comment-15930241
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm 
now waiting for travis to give the green light and then I'll merge.

@chenqin A lot of thanks also to you for working on this and pushing it 
with me!  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm 
now waiting for travis to give the green light and then I'll merge.

@chenqin A lot of thanks also to you for working on this and pushing it 
with me! 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930235#comment-15930235
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106688499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
--- End diff --

@fhueske 
Given also your previous comment i would assume this goes down ...which i 
did


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106688499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
--- End diff --

@fhueske 
Given also your previous comment i would assume this goes down ...which i 
did


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930234#comment-15930234
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106688280
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -785,7 +785,7 @@ object AggregateUtil {
 (propPos._1, propPos._2)
   }
 
-  private def transformToAggregateFunctions(
--- End diff --

It is not to much logic...and the overall 
"createTimeBoundedProcessingTimeOverWindow" is not bigger than the other method 
for unbound.
...as I said...it is the same...so as you prefer


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread rtudoran
Github user rtudoran commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106688280
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -785,7 +785,7 @@ object AggregateUtil {
 (propPos._1, propPos._2)
   }
 
-  private def transformToAggregateFunctions(
--- End diff --

It is not to much logic...and the overall 
"createTimeBoundedProcessingTimeOverWindow" is not bigger than the other method 
for unbound.
...as I said...it is the same...so as you prefer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106673680
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106677330
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106670661
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerEventTimeTimer(N namespace, long time);
+   abstract public void registerEventTimeTimer(N namespace, long time);
 
/**
 * Deletes the timer for the given key and namespace.
  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106670841
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerEventTimeTimer(N namespace, long time);
+   abstract public void registerEventTimeTimer(N namespace, long time);
 
/**
 * Deletes the timer for the given key and namespace.
  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106675685
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106671203
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerEventTimeTimer(N namespace, long time);
+   abstract public void registerEventTimeTimer(N namespace, long time);
 
/**
 * Deletes the timer for the given key and namespace.
  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106674528
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106673233
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
--- End diff --

I think this should be initialized as 
`KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106675210
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106680293
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106669225
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
--- End diff --

Unfinished comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106674160
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106674867
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106670283
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerEventTimeTimer(N namespace, long time);
+   abstract public void registerEventTimeTimer(N namespace, long time);
 
/**
 * Deletes the timer for the given key and namespace.
  

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106669974
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
--- End diff --

I would suggest to still keep the old interface and rename this to 
`AbstractInternalTimerService implements InternalTimerService`. Like 
that, we don't need to introduce the generic parameter K in all places, which 
is actually giving away some implementation detail (K is used only for a 
member, not for the interface methods). I also like to keep the interface slim, 
and probably not every code that deals with `InternalTimerService` has to see 
all the methods, e.g. for snapshots.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106677685
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+   throw new RuntimeException("Error while creating 
directory for rocksdb timer service.", e);
+   }
+
+   ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions()
+  

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677990
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
+   if (!localFile.delete()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+   }
+   }
+
+   blobStore.delete(jobId, key);
+   }
+
+   /**
+* Deletes all files associated with the given job id from the storage.
+*
+* @param jobId JobID of the files in the blob store
+*/
+   @Override
+   public void deleteAll(final JobID jobId) {
+   checkArgument(jobId != null, "Job id must not be null.");
+
+   try {
+   BlobUtils.deleteJobDirectory(storageDir, jobId);
+   } catch (IOException e) {
--- End diff --

If we want to make sure we cleanup in any case, we can actually catch 
`Exception` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
-   // it would be replaced by any 
exception thrown in the finally block
-   IOUtils.closeQuietly(os);
-   IOUtils.closeQuietly(is);
-  

[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106675058
  
--- Diff: docs/setup/config.md ---
@@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the 
root dir under which the ZooKeeper HA mode will create namespace directories. 
Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in 
standalone cluster mode, or the  under YARN) Defines the 
subdirectory under the root dir where the ZooKeeper HA mode will create znodes. 
This allows to isolate multiple applications on the same ZooKeeper. Previously 
this key was named `recovery.zookeeper.path.namespace`.
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone 
cluster mode, or the  under YARN) Defines the subdirectory 
under the root dir where the ZooKeeper HA mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. Previously this key was 
named `recovery.zookeeper.path.namespace` and 
`high-availability.zookeeper.path.namespace`.
--- End diff --

I would move these into ` ### High Availability (HA)` section, because they 
are independent of ZooKeeper


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106680476
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1305,6 +1305,9 @@ class TaskManager(
 s"${task.getExecutionState} to JobManager for task 
${task.getTaskInfo.getTaskName} " +
 s"(${task.getExecutionId})")
 
+  // delete all NAME_ADDRESSABLE BLOBs
+  libraryCacheManager.get.getBlobService.deleteAll(task.getJobID)
--- End diff --

Multiple tasks of the same job run in a TaskManager. This means that tasks 
delete each others blobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3512: [FLINK-6008] collection of BlobServer improvements

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
--- End diff --

From concurrency safety, it better to do `if (!delete && exists)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930220#comment-15930220
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106685158
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Not sure what the edge id exactly does and who uses it so I prefer to not 
touch it, for now.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6027) Ignore the exception thrown by the subsuming of old completed checkpoints

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930221#comment-15930221
 ] 

ASF GitHub Bot commented on FLINK-6027:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
+1, merging this...


> Ignore the exception thrown by the subsuming of old completed checkpoints
> -
>
> Key: FLINK-6027
> URL: https://issues.apache.org/jira/browse/FLINK-6027
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> When a checkpoint is added into the {{CompletedCheckpointStore}} via the 
> method {{addCheckpoint()}}, the oldest checkpoints will be removed from the 
> store if the number of stored checkpoints exceeds the given limit. The 
> subsuming of old checkpoints may fail and make {{addCheckpoint()}} throw 
> exceptions which are caught by {{CheckpointCoordinator}}. Finally, the states 
> in the new checkpoint will be deleted by {{CheckpointCoordinator}}. Because 
> the new checkpoint is still in the store, we may recover the job from the new 
> checkpoint. But the recovery will fail as the states of the checkpoint are 
> all deleted.
> We should ignore the exceptions thrown by the subsuming of old checkpoints 
> because we can always recover from the new checkpoint when successfully 
> adding it into the store. The ignorance may produce some dirty data, but it's 
> acceptable because they can be cleaned with the cleanup hook introduced in 
> the near future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3521: [FLINK-6027][checkpoint] Ignore the exception thrown by t...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3521
  
+1, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement heartbeat logic between TaskManager and JobManager

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930219#comment-15930219
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3151


> Implement heartbeat logic between TaskManager and JobManager
> 
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhijiang
>Assignee: zhijiang
>
> It is part of work for FLIP-6.
> The {{HeartbeatManager}} is mainly used for monitoring heartbeat target and 
> reporting payloads.
> For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} 
> when receive registration from {{TaskManager}}, and schedule a task to 
> {{requestHeartbeat}} at interval time. If not receive heartbeat response 
> within duration time, the {{HeartbeatListener}} will notify heartbeat 
> timeout, then the {{JobManager}} should remove the internal registered 
> {{TaskManager}}.
> For {{TaskManger}} side, it would trigger monitoring the {{HeartbeatTarget}} 
> when receive registration acknowledgement from {{JobManager}}. An it will 
> also be notified heartbeat timeout if not receive heartbeat request from 
> {{JobManager}} within duration time.
> The current implementation will not interact payloads via heartbeat, and it 
> can be added if needed future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-4364) Implement heartbeat logic between TaskManager and JobManager

2017-03-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4364.
--
Resolution: Fixed

Added via 0b3d5c27f4ab7b2dffb37160a1f01cb822bb696e

> Implement heartbeat logic between TaskManager and JobManager
> 
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhijiang
>Assignee: zhijiang
>
> It is part of work for FLIP-6.
> The {{HeartbeatManager}} is mainly used for monitoring heartbeat target and 
> reporting payloads.
> For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} 
> when receive registration from {{TaskManager}}, and schedule a task to 
> {{requestHeartbeat}} at interval time. If not receive heartbeat response 
> within duration time, the {{HeartbeatListener}} will notify heartbeat 
> timeout, then the {{JobManager}} should remove the internal registered 
> {{TaskManager}}.
> For {{TaskManger}} side, it would trigger monitoring the {{HeartbeatTarget}} 
> when receive registration acknowledgement from {{JobManager}}. An it will 
> also be notified heartbeat timeout if not receive heartbeat request from 
> {{JobManager}} within duration time.
> The current implementation will not interact payloads via heartbeat, and it 
> can be added if needed future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskMana...

2017-03-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3151


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930214#comment-15930214
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

That is what I remember as well.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930207#comment-15930207
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106675058
  
--- Diff: docs/setup/config.md ---
@@ -494,13 +494,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the 
root dir under which the ZooKeeper HA mode will create namespace directories. 
Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in 
standalone cluster mode, or the  under YARN) Defines the 
subdirectory under the root dir where the ZooKeeper HA mode will create znodes. 
This allows to isolate multiple applications on the same ZooKeeper. Previously 
this key was named `recovery.zookeeper.path.namespace`.
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone 
cluster mode, or the  under YARN) Defines the subdirectory 
under the root dir where the ZooKeeper HA mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper. Previously this key was 
named `recovery.zookeeper.path.namespace` and 
`high-availability.zookeeper.path.namespace`.
--- End diff --

I would move these into ` ### High Availability (HA)` section, because they 
are independent of ZooKeeper


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930211#comment-15930211
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106680476
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1305,6 +1305,9 @@ class TaskManager(
 s"${task.getExecutionState} to JobManager for task 
${task.getTaskInfo.getTaskName} " +
 s"(${task.getExecutionId})")
 
+  // delete all NAME_ADDRESSABLE BLOBs
+  libraryCacheManager.get.getBlobService.deleteAll(task.getJobID)
--- End diff --

Multiple tasks of the same job run in a TaskManager. This means that tasks 
delete each others blobs.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930209#comment-15930209
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674262
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
- 

[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

That is what I remember as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930210#comment-15930210
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106674837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws 
IOException {
 
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+   LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
// loop over retries
int attempt = 0;
while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.get(requiredBlob);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   getURLTransferFile(buf, is, os);
+
+   // success, we finished
+   return localJarFile.toURI().toURL();
+   }
+   catch (Throwable t) {
+   getURLOnException(requiredBlob.toString(), 
localJarFile, attempt, t);
 
-   if (attempt == 0) {
-   LOG.info("Downloading {} from {}", 
requiredBlob, serverAddress);
-   } else {
+   // retry
+   ++attempt;
LOG.info("Downloading {} from {} (retry {})", 
requiredBlob, serverAddress, attempt);
}
+   } // end loop over retries
+   }
 
-   try {
-   BlobClient bc = null;
-   InputStream is = null;
-   OutputStream os = null;
+   /**
+* Returns the URL for the BLOB with the given parameters. The method 
will first attempt to
+* serve the BLOB from its local cache. If the BLOB is not in the 
cache, the method will try
+* to download it from this cache's BLOB server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+* @return URL referring to the local storage location of the BLOB.
+* @throws java.io.FileNotFoundException if the path does not exist;
+* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+*/
+   public URL getURL(final JobID jobId, final String key) throws 
IOException {
+   checkArgument(jobId != null, "Job id cannot be null.");
+   checkArgument(key != null, "BLOB name cannot be null.");
 
-   try {
-   bc = new BlobClient(serverAddress, 
blobClientConfig);
-   is = bc.get(requiredBlob);
-   os = new FileOutputStream(localJarFile);
-
-   while (true) {
-   final int read = is.read(buf);
-   if (read < 0) {
-   break;
-   }
-   os.write(buf, 0, read);
-   }
-
-   // we do explicitly not use a finally 
block, because we want the closing
-   // in the regular case to throw 
exceptions and cause the writing to fail.
-   // But, the closing on exception should 
not throw further exceptions and
-   // let us keep the root exception
-   os.close();
-   os = null;
-   is.close();
-   is = null;
-   bc.close();
-   bc = null;
-
-   // success, we finished
-   return localJarFile.toURI().toURL();
-   }
-   catch (Throwable t) {
-   // we use "catch (Throwable)" to keep 
the root exception. Otherwise that exception
- 

[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930206#comment-15930206
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677990
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
+   if (!localFile.delete()) {
+   LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+   }
+   }
+
+   blobStore.delete(jobId, key);
+   }
+
+   /**
+* Deletes all files associated with the given job id from the storage.
+*
+* @param jobId JobID of the files in the blob store
+*/
+   @Override
+   public void deleteAll(final JobID jobId) {
+   checkArgument(jobId != null, "Job id must not be null.");
+
+   try {
+   BlobUtils.deleteJobDirectory(storageDir, jobId);
+   } catch (IOException e) {
--- End diff --

If we want to make sure we cleanup in any case, we can actually catch 
`Exception` here.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930208#comment-15930208
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3512#discussion_r106677799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -400,6 +418,47 @@ public void delete(BlobKey key) throws IOException {
}
 
/**
+* Deletes the file associated with the given job and key if it exists 
in the local
+* storage of the blob server.
+*
+* @param jobId JobID of the file in the blob store
+* @param key   String key of the file in the blob store
+*/
+   @Override
+   public void delete(JobID jobId, String key) {
+   checkArgument(jobId != null, "Job id must not be null.");
+   checkArgument(key != null, "BLOB name must not be null.");
+
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, key);
+
+   if (localFile.exists()) {
--- End diff --

From concurrency safety, it better to do `if (!delete && exists)`


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930204#comment-15930204
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684096
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

I must have removed the check by accident. I think we agreed to rename this 
to something more meaningful and keep it, right?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684096
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

I must have removed the check by accident. I think we agreed to rename this 
to something more meaningful and keep it, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930168#comment-15930168
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106680293
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
+   
+   /** The path where the rocksdb locates */
+   private final Path dbPath;
+
+   /**
+* The in-memory heaps backed by rocksdb to retrieve the next timer to 
trigger. Each
+* partition's leader is stored in the heap. When the timers in a 
partition is changed, we
+* will change the partition's leader and update the heap accordingly.
+*/
+   private final int numPartitions;
+   private final PersistentTimerHeap eventTimeHeap;
+   private final PersistentTimerHeap processingTimeHeap;
+   
+   private static int MAX_PARTITIONS = (1 << 16);
+
+   public RocksDBInternalTimerService(
+   int totalKeyGroups,
+   KeyGroupRange keyGroupRange,
+   KeyContext keyContext,
+   ProcessingTimeService processingTimeService,
+   Path dbPath) {
+
+   super(totalKeyGroups, keyGroupRange, keyContext, 
processingTimeService);
+   
+   this.dbPath = dbPath;
+   
+   try {
+   FileSystem fileSystem = this.dbPath.getFileSystem();
+   if (fileSystem.exists(this.dbPath)) {
+   fileSystem.delete(this.dbPath, true);
+   }
+   
+   fileSystem.mkdirs(dbPath);
+   } catch (IOException e) {
+ 

[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930169#comment-15930169
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106672829
  
--- Diff: 
flink-contrib/flink-timerserivce-rocksdb/src/main/java/org/apache/flink/contrib/streaming/api/operators/RocksDBInternalTimerService.java
 ---
@@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.api.operators;
+
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * {@link InternalTimerService} that stores timers in RocksDB.
+ */
+public class RocksDBInternalTimerService extends 
InternalTimerService {
+   
+   private static Logger LOG = 
LoggerFactory.getLogger(RocksDBInternalTimerService.class);
+   
+   /** The data base where stores all timers */
+   private final RocksDB db;
--- End diff --

I think that we should avoid creating more instances of RocksDB if we can. 
This makes native memory consumption more unpredictable and creates more files 
on snapshots. My suggestion is do do a refactoring that timer services must be 
requested through a keyed state backend. The RocksDB backend could then re-use 
the same database instance as in the keyed backend for the timer service, to 
reduce all the overheads. I think this request should still allow for asking a 
RocksDB based timer service, even though using a `HeapKeyedStateBackend` and 
vice-versa.


> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in 

[GitHub] flink pull request #3359: [FLINK-5544][streaming] Add InternalTimerService i...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106668768
  
--- Diff: flink-contrib/flink-timerserivce-rocksdb/pom.xml ---
@@ -0,0 +1,80 @@
+
--- End diff --

I think we should simply integrate the RocksDB timer service in the project 
flink-statebackend-rocksdb.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930193#comment-15930193
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3359
  
One additional comment, also as reminder for @aljoscha and me: after this 
PR is rebased, we have access to `InternalKeyContext`, which should be somehow 
integrated with the already existing `KeyContext` interface. This would reduce 
several constructor parameters and some members, e.g. the `KeyGroupRange`s and 
`TypeSerializer`s.


> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3359
  
One additional comment, also as reminder for @aljoscha and me: after this 
PR is rebased, we have access to `InternalKeyContext`, which should be somehow 
integrated with the already existing `KeyContext` interface. This would reduce 
several constructor parameters and some members, e.g. the `KeyGroupRange`s and 
`TypeSerializer`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930179#comment-15930179
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106670490
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer 

[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930174#comment-15930174
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3359#discussion_r106671203
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 ---
@@ -18,43 +18,306 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Interface for working with time and timers.
  *
  * This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
  * that allows to specify a key and a namespace to which timers should be 
scoped.
  *
+ * All d
+ * 
+ * @param  Type of the keys in the stream
  * @param  Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimerService {
+public abstract class InternalTimerService implements 
ProcessingTimeCallback, EventTimeCallback {
+
+   protected final ProcessingTimeService processingTimeService;
+
+   protected final KeyContext keyContext;
+
+   protected final int totalKeyGroups;
+
+   protected final KeyGroupRange keyGroupRange;
+
+   /**
+* The one and only Future (if any) registered to execute the
+* next {@link Triggerable} action, when its (processing) time arrives.
+*/
+   protected ScheduledFuture nextTimer;
+
+   /**
+* The local event time, as denoted by the last received
+* {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
+*/
+   private long currentWatermark = Long.MIN_VALUE;
+
+   // Variables to be set when the service is started.
+
+   protected TypeSerializer keySerializer;
+
+   protected TypeSerializer namespaceSerializer;
+
+   private InternalTimer.TimerSerializer timerSerializer;
+
+   protected Triggerable triggerTarget;
+
+   private volatile boolean isInitialized;
+
+   public InternalTimerService(
+   int totalKeyGroups, 
+   KeyGroupRange keyGroupRange, 
+   KeyContext keyContext, 
+   ProcessingTimeService processingTimeService) {
+   
+   this.totalKeyGroups = totalKeyGroups;
+   this.keyGroupRange = checkNotNull(keyGroupRange);
+   this.keyContext = checkNotNull(keyContext);
+   this.processingTimeService = 
checkNotNull(processingTimeService);
+   }
 
/** Returns the current processing time. */
-   long currentProcessingTime();
+   public long currentProcessingTime() {
+   return processingTimeService.getCurrentProcessingTime();
+   }
 
/** Returns the current event-time watermark. */
-   long currentWatermark();
+   public long currentWatermark() {
+   return currentWatermark;
+   }
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer fires.
 */
-   void registerProcessingTimeTimer(N namespace, long time);
+   abstract public void registerProcessingTimeTimer(N namespace, long 
time);
 
/**
 * Deletes the timer for the given key and namespace.
 */
-   void deleteProcessingTimeTimer(N namespace, long time);
+   abstract public void deleteProcessingTimeTimer(N namespace, long time);
 
/**
 * Registers a timer to be fired when processing time passes the given 
time. The namespace
 * you pass here will be provided when the timer 

  1   2   3   4   5   >