[jira] [Resolved] (FLINK-10145) Add replace supported in TableAPI and SQL

2018-09-26 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui resolved FLINK-10145.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Implemented in 1.7.0: 24af70fdecbbb66e8555df7aca35a92a2f1aa7ac

> Add replace supported in TableAPI and SQL
> -
>
> Key: FLINK-10145
> URL: https://issues.apache.org/jira/browse/FLINK-10145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Guibo Pan
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> replace is an useful function for String. 
> for example:
> {code:java}
> select replace("Hello World", "World", "Flink") // return "Hello Flink"
> select replace("ababab", "abab", "z") // return "zab"
> {code}
> It is supported as a UDF in Hive, more details please see[1]
> [1]: 
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL

2018-09-26 Thread GitBox
asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in 
TableAPI and SQL
URL: https://github.com/apache/flink/pull/6576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 11082941385..00465727399 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
   
 
 
+
+  
+{% highlight text %}
+REPLACE(string1, string2, string3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces string2 with 
string3 (non-overlapping) from string1
+E.g., REPLACE("hello world", "world", "flink") returns 
"hello flink"; REPLACE("ababab", "abab", "z") returns "zab".
+  
+
+
 
   
 {% highlight text %}
@@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2)
   
 
 
+
+  
+{% highlight java %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces STRING2 with 
STRING3 (non-overlapping) from STRING1.
+E.g., 'hello world'.replace('world', 'flink') returns 
'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'.
+  
+
+
 
   
 {% highlight java %}
@@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2)
   
 
 
+
+  
+{% highlight scala %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces STRING2 with 
STRING3 (non-overlapping) from STRING1.
+E.g., "hello world".replace("world", "flink") returns 
"hello flink"; "ababab".replace("abab", "z") returns "zab".
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 14adb71bbab..3c726678b24 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -471,6 +471,13 @@ trait ImplicitExpressionOperations {
 }
   }
 
+  /**
+* Creates a new string of the given string with non-overlapping occurrences
+* of given search replaced with replacement.
+*/
+  def replace(search: Expression, replacement: Expression) =
+Replace(expr, search, replacement)
+
   /**
 * Returns the length of a string.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index e32005213eb..bd05f9bd71e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -153,6 +153,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethods.REGEXP_REPLACE)
 
+  addSqlFunctionMethod(
+REPLACE,
+Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+STRING_TYPE_INFO,
+BuiltInMethod.REPLACE.method)
+
   addSqlFunctionMethod(
 FROM_BASE64,
 Seq(STRING_TYPE_INFO),
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 5734b369fdf..f5e6ad2fff3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends 
Expression with InputT
 
   override def toString: String = s"($str).repeat($n)"
 }
+
+/**
+  * Returns the string `str` with all non-overlapping occurrences
+* of `search` replaced with `replacement`.
+*/
+  case class Replace(str: Expression,
+ search: Expression,
+ replacement: Expression) extends Expression with 
InputTypeSpec {
+
+def this(str: Expression, begin: Expression) = this(str, begin, 
CharLength(str))
+
+override private[flink] def children: Seq[Expression] = str :: search :: 
replacement :: Nil
+
+override private[flink] def resultType: TypeInformation[_] = 
STRING_TYPE_INFO
+
+override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+  

[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629792#comment-16629792
 ] 

ASF GitHub Bot commented on FLINK-10145:


asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in 
TableAPI and SQL
URL: https://github.com/apache/flink/pull/6576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 11082941385..00465727399 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
   
 
 
+
+  
+{% highlight text %}
+REPLACE(string1, string2, string3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces string2 with 
string3 (non-overlapping) from string1
+E.g., REPLACE("hello world", "world", "flink") returns 
"hello flink"; REPLACE("ababab", "abab", "z") returns "zab".
+  
+
+
 
   
 {% highlight text %}
@@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2)
   
 
 
+
+  
+{% highlight java %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces STRING2 with 
STRING3 (non-overlapping) from STRING1.
+E.g., 'hello world'.replace('world', 'flink') returns 
'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'.
+  
+
+
 
   
 {% highlight java %}
@@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2)
   
 
 
+
+  
+{% highlight scala %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+  
+  
+Returns a new string which replaces STRING2 with 
STRING3 (non-overlapping) from STRING1.
+E.g., "hello world".replace("world", "flink") returns 
"hello flink"; "ababab".replace("abab", "z") returns "zab".
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 14adb71bbab..3c726678b24 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -471,6 +471,13 @@ trait ImplicitExpressionOperations {
 }
   }
 
+  /**
+* Creates a new string of the given string with non-overlapping occurrences
+* of given search replaced with replacement.
+*/
+  def replace(search: Expression, replacement: Expression) =
+Replace(expr, search, replacement)
+
   /**
 * Returns the length of a string.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index e32005213eb..bd05f9bd71e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -153,6 +153,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethods.REGEXP_REPLACE)
 
+  addSqlFunctionMethod(
+REPLACE,
+Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+STRING_TYPE_INFO,
+BuiltInMethod.REPLACE.method)
+
   addSqlFunctionMethod(
 FROM_BASE64,
 Seq(STRING_TYPE_INFO),
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 5734b369fdf..f5e6ad2fff3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends 
Expression with InputT
 
   override def toString: String = s"($str).repeat($n)"
 }
+
+/**
+  * Returns the string `str` with all non-overlapping occurrences
+* of `search` replaced with `replacement`.
+*/
+  case class Replace(str: Expression,
+ search: Expression,
+ replacement: Expression) extends Expression with 
InputTypeSpec {
+
+def this(str: Expression, begin: Expression) = this(str, begin, 
CharLength(str))
+
+override private[flink] def children: 

[jira] [Comment Edited] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769
 ] 

tison edited comment on FLINK-10429 at 9/27/18 5:18 AM:


I like the proposal treat schedule a separated component. It is helpful for 
further optimize on scheduling such as FLINK-10240.

To  [~zhuzh], [~tiemsn]
I think the first step of the achieve of this redesign would be the extract 
part, maybe you can take this document into consideration? Also, the link to 
FLINK-10240 is broken, the correct one is 
https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/
 (without the "edit" part). And I afraid that it would prevent further 
discussion that this document is READ-ONLY, any thought could not be commented 
on.

For more information, our users start looking for more flexible schedule 
strategy. Wish this redesign could help.


was (Author: tison):
I like the proposal treat schedule a separated component. It is helpful for 
further optimize on scheduling such as FLINK-10240.

To  [~zhuzh], [~tiemsn]
I think the first step of the achieve of this redesign would be the extract 
part, maybe you can take this document into consideration? Also, the link to 
FLINK-10240 is broken, the correct one is 
https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/
 . And I afraid that it would prevent further discussion that this document is 
READ-ONLY, any thought could not be commented on.

For more information, our users start looking for more flexible schedule 
strategy. Wish this redesign could help.

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769
 ] 

tison edited comment on FLINK-10429 at 9/27/18 5:18 AM:


I like the proposal treat schedule a separated component. It is helpful for 
further optimize on scheduling such as FLINK-10240.

To  [~zhuzh], [~tiemsn]
I think the first step of the achieve of this redesign would be the extract 
part, maybe you can take this document into consideration? Also, the link to 
FLINK-10240 is broken, the correct one is 
https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/
 (without the "edit" part). And I afraid that it would prevent further 
discussion that this document is READ-ONLY, any thought could not be commented 
on.

For more information, our users start looking for more flexible schedule 
strategy[1]. Wish this redesign could help.

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Scheduling-sources-td23344.html
 


was (Author: tison):
I like the proposal treat schedule a separated component. It is helpful for 
further optimize on scheduling such as FLINK-10240.

To  [~zhuzh], [~tiemsn]
I think the first step of the achieve of this redesign would be the extract 
part, maybe you can take this document into consideration? Also, the link to 
FLINK-10240 is broken, the correct one is 
https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/
 (without the "edit" part). And I afraid that it would prevent further 
discussion that this document is READ-ONLY, any thought could not be commented 
on.

For more information, our users start looking for more flexible schedule 
strategy. Wish this redesign could help.

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769
 ] 

tison commented on FLINK-10429:
---

I like the proposal treat schedule a separated component. It is helpful for 
further optimize on scheduling such as FLINK-10240.

To  [~zhuzh], [~tiemsn]
I think the first step of the achieve of this redesign would be the extract 
part, maybe you can take this document into consideration? Also, the link to 
FLINK-10240 is broken, the correct one is 
https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/
 . And I afraid that it would prevent further discussion that this document is 
READ-ONLY, any thought could not be commented on.

For more information, our users start looking for more flexible schedule 
strategy. Wish this redesign could help.

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629735#comment-16629735
 ] 

shuai.xu commented on FLINK-10429:
--

+1

This proposal goes in the same direction as our approach. We have already 
initiated a discussion about how to make the schedule strategy plugable in 
[https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit.]
 I think we can make the scheduler more powerful together.

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629718#comment-16629718
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099
 
 
   cc @tillrohrmann @StephanEwen could you help advance this PR?
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629716#comment-16629716
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099
 
 
   cc @tillrohrmann @StephanEwen could you help advance this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-09-26 Thread GitBox
Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099
 
 
   cc @tillrohrmann @StephanEwen could you help advance this PR?
   Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-09-26 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099
 
 
   cc @tillrohrmann @StephanEwen could you help advance this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10418) Add COTH math function supported in Table API and SQL

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629709#comment-16629709
 ] 

ASF GitHub Bot commented on FLINK-10418:


yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & 
SQL] Add COTH math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6764#discussion_r220784635
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1230,6 +1230,17 @@ COT(numeric)
   
 
 
+
+  
+{% highlight text %}
+COTH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cotangent of a numeric.
 
 Review comment:
   please add some information about returned types, refer to @pnowojski 's 
review suggestion in #6700 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add COTH math function supported in Table API and SQL
> -
>
> Key: FLINK-10418
> URL: https://issues.apache.org/jira/browse/FLINK-10418
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Aleksei Izmalkin
>Priority: Minor
>  Labels: pull-request-available
>
> Inspired by FLINK-10398



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & SQL] Add COTH math function supported in Table API and SQL

2018-09-26 Thread GitBox
yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & 
SQL] Add COTH math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6764#discussion_r220784635
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1230,6 +1230,17 @@ COT(numeric)
   
 
 
+
+  
+{% highlight text %}
+COTH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cotangent of a numeric.
 
 Review comment:
   please add some information about returned types, refer to @pnowojski 's 
review suggestion in #6700 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629684#comment-16629684
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220779905
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   Oh I get your point, sorry for the delay. This is a bit about style, and I 
think both of them are OK. If there is a mandatory requirement on "not change 
that", I am ok to revert it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Sub-task
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-26 Thread GitBox
TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220779905
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 ##
 @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
 
task.failExternally(new Exception("test"));
-   assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
 
 Review comment:
   Oh I get your point, sorry for the delay. This is a bit about style, and I 
think both of them are OK. If there is a mandatory requirement on "not change 
that", I am ok to revert it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629681#comment-16629681
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220779551
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -846,6 +817,23 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
//  Test Utilities
// 

 
+   private static void waitUntilExecutionState(Task task, ExecutionState 
exceptedState, Deadline deadline) {
+   while (deadline.hasTimeLeft()) {
+   if (exceptedState == task.getExecutionState()) {
+   return;
+   }
+
+   try {
+   
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
 
 Review comment:
   Thanks! I change waiting style to notify style, it would be more stable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Sub-task
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-26 Thread GitBox
TisonKun commented on a change in pull request #6729: [FLINK-10386] 
[taskmanager] Remove legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#discussion_r220779551
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -846,6 +817,23 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
//  Test Utilities
// 

 
+   private static void waitUntilExecutionState(Task task, ExecutionState 
exceptedState, Deadline deadline) {
+   while (deadline.hasTimeLeft()) {
+   if (exceptedState == task.getExecutionState()) {
+   return;
+   }
+
+   try {
+   
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
 
 Review comment:
   Thanks! I change waiting style to notify style, it would be more stable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629674#comment-16629674
 ] 

ASF GitHub Bot commented on FLINK-10339:


zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory 
for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982
 
 
   Thanks for your reviews! @StephanEwen
   
   That is a good idea for adding `allocateUnpooledOffHeapMemory` in the 
factory, so it can also make easy for using in `MemoryManager` and 
`NetworkBufferPool`. I already update the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool

2018-09-26 Thread GitBox
zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory 
for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982
 
 
   Thanks for your reviews! @StephanEwen
   
   That is a good idea for adding `allocateUnpooledOffHeapMemory` in the 
factory, so it can also make easy for using in `MemoryManager` and 
`NetworkBufferPool`. I already update the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8594) Make producesUpdates in DataStreamGroupAggregate return false if it is a distinct group by without state retention configuration

2018-09-26 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629671#comment-16629671
 ] 

Hequn Cheng commented on FLINK-8594:


Hi [~twalthr], you are right. {{SELECT DISTINCT}} should be an append stream. 
The main problem is we have to output data to reset state clean-up timers of 
the downstream operators when retention time has been set.
Sorry that I haven't started to solve the problem. To solve the problem, I will 
try to solve [FLINK-8566|https://issues.apache.org/jira/browse/FLINK-8566] 
first. What do you think?

> Make producesUpdates in DataStreamGroupAggregate return false if it is a 
> distinct group by without state retention configuration 
> -
>
> Key: FLINK-8594
> URL: https://issues.apache.org/jira/browse/FLINK-8594
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Internally, the {{DISTINCT}} is translated into a {{GROUP BY}} with all 
> distinct fields being keys and no aggregation functions. However, this kind 
> of \{{GROUP BY}} don't generate update at all if the state retention time has 
> not been configured. We can treat the result table as an append table. 
> Therefore, we can make the \{{producesUpdates}} function return false, so the 
> downstream group by will not choose the correspond retract agg function, 
> i.e., {{MaxWithRetractAggFunction}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component

2018-09-26 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629571#comment-16629571
 ] 

JIN SUN commented on FLINK-10429:
-

+1 

I like the proposal, have put some comments in the document.  

> Redesign Flink Scheduling, introducing dedicated Scheduler component
> 
>
> Key: FLINK-10429
> URL: https://issues.apache.org/jira/browse/FLINK-10429
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> This epic tracks the redesign of scheduling in Flink. Scheduling is currently 
> a concern that is scattered across different components, mainly the 
> ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on 
> the granularity of individual tasks, which make holistic scheduling 
> strategies hard to implement. In this epic we aim to introduce a dedicated 
> Scheduler component that can support use-case like auto-scaling, 
> local-recovery, and resource optimized batch.
> The design for this feature is developed here: 
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10446) Use the "guava beta checker" plugin to keep off of @Beta API

2018-09-26 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10446:
--

 Summary: Use the "guava beta checker" plugin to keep off of @Beta 
API
 Key: FLINK-10446
 URL: https://issues.apache.org/jira/browse/FLINK-10446
 Project: Flink
  Issue Type: Task
Reporter: Ted Yu


The Guava people publish an Error Prone plugin to detect when stuff that's 
annotated with @Beta gets used. Those things shouldn't be used because the 
project gives no promises about deprecating before removal.

plugin:

https://github.com/google/guava-beta-checker



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10288) Failover Strategy improvement

2018-09-26 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN updated FLINK-10288:

Fix Version/s: 1.7.0

> Failover Strategy improvement
> -
>
> Key: FLINK-10288
> URL: https://issues.apache.org/jira/browse/FLINK-10288
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
> Fix For: 1.7.0
>
>
> Flink pays significant efforts to make Streaming Job fault tolerant. The 
> checkpoint mechanism and exactly once semantics make Flink different than 
> other systems. However, there are still some cases not been handled very 
> well. Those cases can apply to both Streaming and Batch scenarios, and its 
> orthogonal with current fault tolerant mechanism. Here is a summary of those 
> cases:
>  # Some failures are non-recoverable, such as a user error: 
> DividebyZeroException. We shouldn't try to restart the task, as it will never 
> succeed. The DivideByZeroException is just a simple case, those errors 
> sometime are not easy to reproduce or predict, as it might be only triggered 
> by specific input data, we shouldn’t retry for all user code exceptions.
>  # There is no limit for task retry today, unless a SuppressRestartException 
> was encountered, a task will keep on retrying until it succeeds. As mentioned 
> above, we shouldn’t retry for some cases at all, and for the Exceptions we 
> can retry, such as a network exception, should we have a retry limit? We need 
> retry for any transient issue, but we also need to set a limit to avoid 
> infinite retry and resource wasting. For Batch and Streaming workload, we 
> might need different strategies.
>  # There are some exceptions due to hardware issues, such as disk/network 
> malfunction. when a task/TaskManager fail on this, we’d better detect and 
> avoid to schedule to that machine next time.
>  # If a task read from a blocking result partition, when its input is not 
> available, we can ‘revoke’ the produce task, set the task fail and rerun the 
> upstream task to regenerate data.  the revoke can propagate up through the 
> chain. In Spark, revoke is naturally support by lineage.
> To make fault tolerance easier, we need to keep deterministic behavior as 
> much as possible. For user code, it’s not easy to control. However, for 
> system related code, we can fix it. For example, we should at least make sure 
> the different attempt of a same task to have the same inputs (we have a bug 
> in current codebase (DataSourceTask) that cannot guarantee this). Note that 
> this is track by [Flink-10205]
> Details see this proposal:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10298) Batch Job Failover Strategy

2018-09-26 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN updated FLINK-10298:

Fix Version/s: 1.7.0

> Batch Job Failover Strategy
> ---
>
> Key: FLINK-10298
> URL: https://issues.apache.org/jira/browse/FLINK-10298
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
> Fix For: 1.7.0
>
>
> The new failover strategy needs to consider handling failures according to 
> different failure types. It orchestrates all the logics we mentioned in this 
> [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit],
>  we can put the logic in onTaskFailure method of the FailoverStrategy 
> interface, with the logic inline:
> {code:java}
> public void onTaskFailure(Execution taskExecution, Throwable cause) {  
>     //1. Get the throwable type
>     //2. If the type is NonrecoverableType fail the job
>     //3. If the type is PatritionDataMissingError, do revocation
>     //4. If the type is EnvironmentError, do check blacklist
> //5. Other failure types are recoverable, but we need to remember the 
> count of the failure,
> //6. if it exceeds the threshold, fail the job
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-26 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN updated FLINK-10289:

Fix Version/s: 1.7.0

> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-26 Thread JIN SUN (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JIN SUN updated FLINK-10205:

Affects Version/s: 1.6.2
   1.6.1

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629473#comment-16629473
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220735704
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 ##
 @@ -435,7 +435,6 @@ private void 
assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S
Assert.assertEquals(expected.getName(), actual.getName());
Assert.assertEquals(expected.getBackendStateType(), 
actual.getBackendStateType());
Assert.assertEquals(expected.getOptionsImmutable(), 
actual.getOptionsImmutable());
-   Assert.assertEquals(expected.getSerializersImmutable(), 
actual.getSerializersImmutable());
 
 Review comment:
   Just double-checking: Was this intentionally removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629475#comment-16629475
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220738712
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSerializer 

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629474#comment-16629474
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220726336
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
 
 Review comment:
   Minor comment: Looks like that variable could be defined inside the loop. 
Good rule of thumb is that variables should have the minimal scope by default .


This is 

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629471#comment-16629471
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220734142
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
+ * information to serve as a factory, the backwards compatible path for 
restoring from these older
+ * savepoints would be to just use the written serializer.
+ *
+ * Therefore, when restoring from older savepoints which still contained 
both the config snapshot
+ * and the serializer, they are both wrapped within this utility class. When 
the caller intends
+ * to instantiate a restore serializer, we simply return the wrapped 
serializer instance.
+ *
+ * @param  the data type that the wrapped serializer instance serializes.
+ */
+@Internal
+public class BackwardsCompatibleConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+   /**
+* The actual serializer config snapshot. This may be {@code null} when 
reading a
+* savepoint from Flink <= 1.2.
+*/
+   @Nullable
+   private TypeSerializerConfigSnapshot wrappedConfigSnapshot;
+
+   /**
+* The serializer instance written in savepoints.
+*/
+   @Nonnull
+   private TypeSerializer serializerInstance;
+
+   public BackwardsCompatibleConfigSnapshot(
+   @Nullable TypeSerializerConfigSnapshot 
wrappedConfigSnapshot,
+   TypeSerializer serializerInstance) {
+
+   this.wrappedConfigSnapshot = wrappedConfigSnapshot;
+   this.serializerInstance = 
Preconditions.checkNotNull(serializerInstance);
+   }
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public void read(DataInputView in) throws IOException {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public int getVersion() {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public TypeSerializer restoreSerializer() {
+   return serializerInstance;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(TypeSerializer newSerializer) {
+   if (wrappedConfigSnapshot != null) {
+   return (TypeSerializerSchemaCompatibility) 
wrappedConfigSnapshot.resolveSchemaCompatibility(newSerializer);
+ 

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629470#comment-16629470
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220727617
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
+   for (int i = 0; i < numFields; i++) {
+   proxy = new 
TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+   

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629472#comment-16629472
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220739021
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
 ##
 @@ -77,6 +77,11 @@ private void resolveVersionRead(int readVersion) throws 
VersionMismatchException
}
}
 
+   // TODO this is a temporary workaround for FLINK-9377 that 
should be removed
+   if (readVersion == getVersion() + 1) {
 
 Review comment:
   This seems fragile here, because it is in a common base class outside the 
serializer utils.
   Can this be solved by overriding "getCompatibleVersions()" instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629476#comment-16629476
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220738283
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
+   for (int i = 0; i < numFields; i++) {
+   proxy = new 
TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+   

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220738283
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
+   for (int i = 0; i < numFields; i++) {
+   proxy = new 
TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+   
serializerConfigSnapshots.add(proxy.getSerializerConfigSnapshot());
+   }
+
+   return serializerConfigSnapshots;
+   }
+
+   /**
+* Utility serialization proxy for a {@link 
TypeSerializerConfigSnapshot}.
+*/
+   

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220735704
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 ##
 @@ -435,7 +435,6 @@ private void 
assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S
Assert.assertEquals(expected.getName(), actual.getName());
Assert.assertEquals(expected.getBackendStateType(), 
actual.getBackendStateType());
Assert.assertEquals(expected.getOptionsImmutable(), 
actual.getOptionsImmutable());
-   Assert.assertEquals(expected.getSerializersImmutable(), 
actual.getSerializersImmutable());
 
 Review comment:
   Just double-checking: Was this intentionally removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220734142
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
+ * information to serve as a factory, the backwards compatible path for 
restoring from these older
+ * savepoints would be to just use the written serializer.
+ *
+ * Therefore, when restoring from older savepoints which still contained 
both the config snapshot
+ * and the serializer, they are both wrapped within this utility class. When 
the caller intends
+ * to instantiate a restore serializer, we simply return the wrapped 
serializer instance.
+ *
+ * @param  the data type that the wrapped serializer instance serializes.
+ */
+@Internal
+public class BackwardsCompatibleConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+   /**
+* The actual serializer config snapshot. This may be {@code null} when 
reading a
+* savepoint from Flink <= 1.2.
+*/
+   @Nullable
+   private TypeSerializerConfigSnapshot wrappedConfigSnapshot;
+
+   /**
+* The serializer instance written in savepoints.
+*/
+   @Nonnull
+   private TypeSerializer serializerInstance;
+
+   public BackwardsCompatibleConfigSnapshot(
+   @Nullable TypeSerializerConfigSnapshot 
wrappedConfigSnapshot,
+   TypeSerializer serializerInstance) {
+
+   this.wrappedConfigSnapshot = wrappedConfigSnapshot;
+   this.serializerInstance = 
Preconditions.checkNotNull(serializerInstance);
+   }
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public void read(DataInputView in) throws IOException {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public int getVersion() {
+   throw new UnsupportedOperationException(
+   "This is a dummy config snapshot used only for 
backwards compatibility.");
+   }
+
+   @Override
+   public TypeSerializer restoreSerializer() {
+   return serializerInstance;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(TypeSerializer newSerializer) {
+   if (wrappedConfigSnapshot != null) {
+   return (TypeSerializerSchemaCompatibility) 
wrappedConfigSnapshot.resolveSchemaCompatibility(newSerializer);
+   } else {
+   // if there is no configuration snapshot to check 
against,
+   // then we can only assume that the new serializer is 
compatible as is
+   return 

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220727617
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
+   for (int i = 0; i < numFields; i++) {
+   proxy = new 
TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+   
serializerConfigSnapshots.add(proxy.getSerializerConfigSnapshot());
+   }
+
+   return serializerConfigSnapshots;
+   }
+
+   /**
+* Utility serialization proxy for a {@link 
TypeSerializerConfigSnapshot}.
+*/
+   

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220738712
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSerializer serializer;
+
+   /**
+* Creates a serializer using this configuration, that is capable of 
reading data
+* written by the serializer described by this configuration.
+*
+* @return the restored serializer.
+  

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220726336
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotSerializationUtil {
+
+   /**
+* Writes a {@link TypeSerializerConfigSnapshot} to the provided data 
output view.
+*
+* It is written with a format that can be later read again using
+* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}.
+*
+* @param out the data output view
+* @param serializerConfigSnapshot the serializer configuration 
snapshot to write
+*
+* @throws IOException
+*/
+   public static  void writeSerializerConfigSnapshot(
+   DataOutputView out,
+   TypeSerializerConfigSnapshot serializerConfigSnapshot,
+   TypeSerializer serializer) throws IOException {
+
+   new 
TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, 
serializer).write(out);
+   }
+
+   /**
+* Reads from a data input view a {@link TypeSerializerConfigSnapshot} 
that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshot
+*
+* @throws IOException
+*/
+   public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   final TypeSerializerConfigSnapshotSerializationProxy proxy = 
new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader);
+   proxy.read(in);
+
+   return proxy.getSerializerConfigSnapshot();
+   }
+
+   /**
+* Reads from a data input view multiple {@link 
TypeSerializerConfigSnapshot}s that was previously
+* written using {@link 
TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView,
 TypeSerializerConfigSnapshot, TypeSerializer)}.
+*
+* @param in the data input view
+* @param userCodeClassLoader the user code class loader to use
+*
+* @return the read serializer configuration snapshots
+*
+* @throws IOException
+*/
+   public static List> 
readSerializerConfigSnapshots(
+   DataInputView in,
+   ClassLoader userCodeClassLoader) throws IOException {
+
+   int numFields = in.readInt();
+   final List> 
serializerConfigSnapshots = new ArrayList<>(numFields);
+
+   TypeSerializerConfigSnapshotSerializationProxy proxy;
 
 Review comment:
   Minor comment: Looks like that variable could be defined inside the loop. 
Good rule of thumb is that variables should have the minimal scope by default .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-26 Thread GitBox
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, 
state backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220739021
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
 ##
 @@ -77,6 +77,11 @@ private void resolveVersionRead(int readVersion) throws 
VersionMismatchException
}
}
 
+   // TODO this is a temporary workaround for FLINK-9377 that 
should be removed
+   if (readVersion == getVersion() + 1) {
 
 Review comment:
   This seems fragile here, because it is in a common base class outside the 
serializer utils.
   Can this be solved by overriding "getCompatibleVersions()" instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10228) Add metrics for netty direct memory consumption

2018-09-26 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-10228:
---
Description: netty direct memory usage can be exposed via metrics so that 
operator can keep track of memory consumption .  (was: netty direct memory 
usage can be exposed via metrics so that operator can keep track of memory 
consumption.)

> Add metrics for netty direct memory consumption
> ---
>
> Key: FLINK-10228
> URL: https://issues.apache.org/jira/browse/FLINK-10228
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> netty direct memory usage can be exposed via metrics so that operator can 
> keep track of memory consumption .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10444) Make S3 entropy injection work with FileSystem safety net

2018-09-26 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-10444.


> Make S3 entropy injection work with FileSystem safety net
> -
>
> Key: FLINK-10444
> URL: https://issues.apache.org/jira/browse/FLINK-10444
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.7.0, 1.6.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> The FileSystem Safety net wraps FileSystems.
> The EntropyInjector needs to be aware of that to recognize filesystems as 
> entropy injecting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10444) Make S3 entropy injection work with FileSystem safety net

2018-09-26 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-10444.
--
Resolution: Fixed

Fixed in
  - 1.6.2 via ec73c60aba5d564d8d946d82a4d7d0911e70dc9f
  - 1.7.0 via cc9b3769084634dc660f7a90aed0090cb46b3342

> Make S3 entropy injection work with FileSystem safety net
> -
>
> Key: FLINK-10444
> URL: https://issues.apache.org/jira/browse/FLINK-10444
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.7.0, 1.6.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> The FileSystem Safety net wraps FileSystems.
> The EntropyInjector needs to be aware of that to recognize filesystems as 
> entropy injecting.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629239#comment-16629239
 ] 

ASF GitHub Bot commented on FLINK-9455:
---

GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` would also be immutable. At the moment there is no need 
to return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager (by accidentally 
modifying the returned collection).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors

2018-09-26 Thread GitBox
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` would also be immutable. At the moment there is no need 
to return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager (by accidentally 
modifying the returned collection).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629238#comment-16629238
 ] 

ASF GitHub Bot commented on FLINK-9455:
---

GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` would also be immutable. At the moment there is no need 
to return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629235#comment-16629235
 ] 

ASF GitHub Bot commented on FLINK-9455:
---

GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` is also immutable. At the moment there is no need to 
return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make SlotManager aware of multi slot TaskManagers
> -
>
> Key: FLINK-9455
> URL: https://issues.apache.org/jira/browse/FLINK-9455
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The {{SlotManager}} responsible for managing all available slots of a Flink 
> cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot 
> request. The started {{TaskManager}} can be started with multiple slots 
> configured but currently, the {{SlotManager}} thinks that it will be started 
> with a single slot. As a consequence, it might issue multiple requests to 
> start new TaskManagers even though a single one would be sufficient to 
> fulfill all pending slot requests.
> In order to avoid requesting unnecessary resources which are freed after the 
> idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a 
> {{TaskManager}} is started with. That way the SlotManager only needs to 
> request a new {{TaskManager}} if all of the previously started slots 
> (potentially not yet registered and, thus, future slots) are being assigned 
> to slot requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors

2018-09-26 Thread GitBox
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` would also be immutable. At the moment there is no need 
to return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors

2018-09-26 Thread GitBox
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support 
for multi task slot TaskExecutors
URL: https://github.com/apache/flink/pull/6734#discussion_r220674263
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void 
payload) {
//  Resource Management
// 

 
-   protected int getNumberPendingSlotRequests() {
-   return slotManager.getNumberPendingSlotRequests();
+   protected int getNumberRequiredTaskManagerSlots() {
+   return slotManager.getNumberPendingTaskManagerSlots();
+   }
+
+   // 

+   //  Helper methods
+   // 

+
+   protected static Collection createSlotsPerWorker(int 
numSlots) {
+   final Collection slots = new 
ArrayList<>(numSlots);
 
 Review comment:
   It would be enough to write:
   `Collections.nCopies(numSlots, ResourceProfile.ANY);` 
   
   The returned `List` is also immutable. At the moment there is no need to 
return a mutable list. I would even say it is not right that after calling 
`Collection startNewWorker(ResourceProfile resourceProfile)`, 
one may potentially modify the state of the ResourceManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10426) Port TaskTest to new code base

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629199#comment-16629199
 ] 

tison commented on FLINK-10426:
---

This sub tasks should consider to test {{Task}} fails if blobs missing.

> Port TaskTest to new code base
> --
>
> Key: FLINK-10426
> URL: https://issues.apache.org/jira/browse/FLINK-10426
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{TaskTest}} to new code base



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629197#comment-16629197
 ] 

ASF GitHub Bot commented on FLINK-10427:


TisonKun opened a new pull request #6768:  [FLINK-10427] [tests] Port 
JobSubmitTest to new code base
URL: https://github.com/apache/flink/pull/6768
 
 
   ## What is the purpose of the change
   
   Port `JobSubmitTest` to new code base
   
   ## Brief change log
   
   `JobSubmitTest` has three tests, here is their diagnosis:
   
   `testFailureWhenJarBlobsMissing` should be ported to `TaskTest#...`, FLIP-6 
loads library on TM, not JM.
   
   To clarify, it is not applicable for FLIP-6 JM fail job because of failing 
to load library. So the porting job is active only when porting `TaskTest`. 
(Because the TaskTest is legacy test, too. we are unable to JUST add a test 
case onto that for now).
   
   `testFailureWhenInitializeOnMasterFails` is covered by 
`JobSubmissionFailsITCase#testExceptionInInitializeOnMaster`
   
   `testAnswerFailureWhenSavepointReadFails` is covered by 
`SavepointITCase#testSubmitWithUnknownSavepointPath`
   
   ## Verifying this change
   
   All about removal of tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   
   cc @tillrohrmann @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobSubmitTest to new code base
> ---
>
> Key: FLINK-10427
> URL: https://issues.apache.org/jira/browse/FLINK-10427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10427:
---
Labels: pull-request-available  (was: )

> Port JobSubmitTest to new code base
> ---
>
> Key: FLINK-10427
> URL: https://issues.apache.org/jira/browse/FLINK-10427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #6768: [FLINK-10427] [tests] Port JobSubmitTest to new code base

2018-09-26 Thread GitBox
TisonKun opened a new pull request #6768:  [FLINK-10427] [tests] Port 
JobSubmitTest to new code base
URL: https://github.com/apache/flink/pull/6768
 
 
   ## What is the purpose of the change
   
   Port `JobSubmitTest` to new code base
   
   ## Brief change log
   
   `JobSubmitTest` has three tests, here is their diagnosis:
   
   `testFailureWhenJarBlobsMissing` should be ported to `TaskTest#...`, FLIP-6 
loads library on TM, not JM.
   
   To clarify, it is not applicable for FLIP-6 JM fail job because of failing 
to load library. So the porting job is active only when porting `TaskTest`. 
(Because the TaskTest is legacy test, too. we are unable to JUST add a test 
case onto that for now).
   
   `testFailureWhenInitializeOnMasterFails` is covered by 
`JobSubmissionFailsITCase#testExceptionInInitializeOnMaster`
   
   `testAnswerFailureWhenSavepointReadFails` is covered by 
`SavepointITCase#testSubmitWithUnknownSavepointPath`
   
   ## Verifying this change
   
   All about removal of tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   
   cc @tillrohrmann @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629002#comment-16629002
 ] 

tison edited comment on FLINK-10427 at 9/26/18 5:47 PM:


{{testFailureWhenJarBlobsMissing}} should be ported to {{TaskTest#...}}, FLIP-6 
loads library on TM, not JM.

To clarify, it is not applicable for FLIP-6 JM fail job because of failing to 
load library. So the porting job is active only when porting {{TaskTest}}. 
(Because the {{TaskTest}} is legacy test, too. we are unable to JUST add a test 
case onto that for now).


was (Author: tison):
{{testFailureWhenJarBlobsMissing}} should be ported to {{TaskTest#...}}, FLIP-6 
loads library on TM, not JM.

> Port JobSubmitTest to new code base
> ---
>
> Key: FLINK-10427
> URL: https://issues.apache.org/jira/browse/FLINK-10427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10406) Port JobManagerTest to new code base

2018-09-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10406:
---
Labels: pull-request-available  (was: )

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629181#comment-16629181
 ] 

ASF GitHub Bot commented on FLINK-10406:


TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to 
new code base
URL: https://github.com/apache/flink/pull/6765#issuecomment-424804290
 
 
   FYI, travis fails on 
`org.apache.flink.queryablestate.network.ClientTest#testSimpleRequests`, which 
I cannot reproduce locally. And since this pull request just modify test scope, 
without any changes on production files, it should not be the reason cause the 
failure.
   
   Ask for a re-trigger or dig out, since it said it is discouraged to send a 
commit for just re-trigger.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to new code base

2018-09-26 Thread GitBox
TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to 
new code base
URL: https://github.com/apache/flink/pull/6765#issuecomment-424804290
 
 
   FYI, travis fails on 
`org.apache.flink.queryablestate.network.ClientTest#testSimpleRequests`, which 
I cannot reproduce locally. And since this pull request just modify test scope, 
without any changes on production files, it should not be the reason cause the 
failure.
   
   Ask for a re-trigger or dig out, since it said it is discouraged to send a 
commit for just re-trigger.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-26 Thread Seth Wiesman (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629155#comment-16629155
 ] 

Seth Wiesman commented on FLINK-10423:
--

[~srichter] I have a question about the best way to handle resource management 
of the RocksDB native handler.{color}{color}

The basic implementation uses a TimerTask to periodically pull metrics from 
RocksDB, this works fine until it is time to release the resource. The question 
I have is how to be a good citizen with the ResourceGuard. Manually calling 
cancel on the TimerTask before calling the blocking cancel on ResourceGuard 
seems brittle in the face of refactoring and it does not seem there is any way 
to access a CancellableRegistry from within RocksDBStateBackend. 

 
The simplest solution I've been able to come up with is to add a new method to 
ResourceGuard that effectively acts as a way to aquire a weak reference. 

{code:java}
public boolean runIfResourceAvailable(Runnable function) {
synchronized (lock) {
if (!closed) {
function.run();
}

return closed;
}
}
{code}

Run atomically only if the resource is available and return if the function was 
run or not. When this method returns false the calling class knows the resource 
is being or has been closed and it is time to shut down. The concern I have 
here is the potential of putting to much work inside of the synchronized block. 

I'm curious what you think is the best path forward.
 

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629130#comment-16629130
 ] 

tison commented on FLINK-10427:
---

{{testAnswerFailureWhenSavepointReadFails}} is covered by 
{{SavepointITCase#testSubmitWithUnknownSavepointPath}}

> Port JobSubmitTest to new code base
> ---
>
> Key: FLINK-10427
> URL: https://issues.apache.org/jira/browse/FLINK-10427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629119#comment-16629119
 ] 

ASF GitHub Bot commented on FLINK-10415:


zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642450
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
+   throw new 
IOException("Netty pipeline was not properly initialized.");
+   } else {
+   
httpRequest.writeTo(channel);
+   future = 
handler.getJsonFuture();
+   success = true;
+   }
} catch (IOException e) {
 
 Review comment:
   We should catch all exceptions here, then this issue wouldn't have occurred 
in the first place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-26 Thread GitBox
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642450
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
+   throw new 
IOException("Netty pipeline was not properly initialized.");
+   } else {
+   
httpRequest.writeTo(channel);
+   future = 
handler.getJsonFuture();
+   success = true;
+   }
} catch (IOException e) {
 
 Review comment:
   We should catch all exceptions here, then this issue wouldn't have occurred 
in the first place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-26 Thread GitBox
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642544
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
 
 Review comment:
   Do we know when/why this happens?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629120#comment-16629120
 ] 

ASF GitHub Bot commented on FLINK-10415:


zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
+   throw new 
IOException("Netty pipeline was not properly initialized.");
+   } else {
+   
httpRequest.writeTo(channel);
+   future = 
handler.getJsonFuture();
+   success = true;
+   }
} catch (IOException e) {
-   return 
FutureUtils.completedExceptionally(new FlinkException("Could not write 
request.", e));
+   future = 
FutureUtils.completedExceptionally(new ConnectionException("Could not write 
request.", e));
+   } finally {
+   if (!success) {
+   channel.close();
 
 Review comment:
   why aren't we closing the channel in both cases? Not being initialized 
_properly_ doesn't necessarily mean that there's nothing to clean up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629121#comment-16629121
 ] 

ASF GitHub Bot commented on FLINK-10415:


zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642544
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
 
 Review comment:
   Do we know when/why this happens?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-26 Thread GitBox
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220642737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
 ##
 @@ -339,12 +338,26 @@ private static Request createRequest(String 
targetAddress, String targetUrl, Htt
.thenComposeAsync(
channel -> {
ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-   CompletableFuture future 
= handler.getJsonFuture();
+
+   CompletableFuture future;
+   boolean success = false;
+
try {
-   httpRequest.writeTo(channel);
+   if (handler == null) {
+   throw new 
IOException("Netty pipeline was not properly initialized.");
+   } else {
+   
httpRequest.writeTo(channel);
+   future = 
handler.getJsonFuture();
+   success = true;
+   }
} catch (IOException e) {
-   return 
FutureUtils.completedExceptionally(new FlinkException("Could not write 
request.", e));
+   future = 
FutureUtils.completedExceptionally(new ConnectionException("Could not write 
request.", e));
+   } finally {
+   if (!success) {
+   channel.close();
 
 Review comment:
   why aren't we closing the channel in both cases? Not being initialized 
_properly_ doesn't necessarily mean that there's nothing to clean up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-26 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629111#comment-16629111
 ] 

tison commented on FLINK-10427:
---

{{testFailureWhenInitializeOnMasterFails}} is covered by 
{{JobSubmissionFailsITCase#testExceptionInInitializeOnMaster}}

> Port JobSubmitTest to new code base
> ---
>
> Key: FLINK-10427
> URL: https://issues.apache.org/jira/browse/FLINK-10427
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629105#comment-16629105
 ] 

ASF GitHub Bot commented on FLINK-10380:


StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-424788457
 
 
   Concerning the other method you mentioned: That one is never used on the 
critical path, so things may be different there.
   
   Catching the null value earlier may be a more clean fix even. But again, we 
should not introduce branches if avoidable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Check if key is not nul before assign to group in KeyedStream
> -
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.6.0
>Reporter: Sayat Satybaldiyev
>Priority: Minor
>  Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method

2018-09-26 Thread GitBox
StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-424788457
 
 
   Concerning the other method you mentioned: That one is never used on the 
critical path, so things may be different there.
   
   Catching the null value earlier may be a more clean fix even. But again, we 
should not introduce branches if avoidable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629096#comment-16629096
 ] 

ASF GitHub Bot commented on FLINK-10415:


zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220638812
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
 ##
 @@ -89,7 +91,78 @@ public void testInvalidVersionRejection() throws Exception {
} catch (IllegalArgumentException e) {
// expected
}
+   }
 
+   /**
+* Tests that we fail the operation if the remote connection closes.
+*/
+   @Test
+   public void testConnectionClosedHandling() throws Exception {
+   final ServerSocket serverSocket = new ServerSocket(0);
+   final String targetAddress = "localhost";
+   final int targetPort = serverSocket.getLocalPort();
+
+   try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+   final CompletableFuture 
responseFuture = restClient.sendRequest(
+   targetAddress,
+   targetPort,
+   new TestMessageHeaders(),
+   EmptyMessageParameters.getInstance(),
+   EmptyRequestBody.getInstance(),
+   Collections.emptyList());
+
+   // establish connection
+   final Socket connectionSocket = serverSocket.accept();
+
+   // close connection
+   connectionSocket.close();
+
+   try {
+   responseFuture.get();
+   } catch (ExecutionException ee) {
+   assertThat(ExceptionUtils.findThrowable(ee, 
ConnectionClosedException.class).isPresent(), is(true));
+   }
+   } finally {
+   serverSocket.close();
+   }
+   }
+
+   /**
+* Tests that we fail the operation if the client closes.
+*/
+   @Test
+   public void testRestClientClosedHandling() throws Exception {
+   final ServerSocket serverSocket = new ServerSocket(0);
+   final String targetAddress = "localhost";
+   final int targetPort = serverSocket.getLocalPort();
+   Socket connectionSocket = null;
+
+   try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+   final CompletableFuture 
responseFuture = restClient.sendRequest(
 
 Review comment:
   but it does fail with a different exception, no? although from a 
user-perspective the same issue happened.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestClient does not react to lost connection
> 
>
> Key: FLINK-10415
> URL: https://issues.apache.org/jira/browse/FLINK-10415
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1, 1.7.0, 1.5.4
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
> seem to react to a lost connections in time. When sending a request to the 
> current leader it happened that the leader was killed just after establishing 
> the connection. Then the {{RestClient}} did not fail the connection and was 
> stuck in writing a request or retrieving a response from the lost leader. I'm 
> wondering whether we should introduce a {{ReadTimeoutHandler}} and 
> {{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient

2018-09-26 Thread GitBox
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response 
future if connection closes in RestClient
URL: https://github.com/apache/flink/pull/6763#discussion_r220638812
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
 ##
 @@ -89,7 +91,78 @@ public void testInvalidVersionRejection() throws Exception {
} catch (IllegalArgumentException e) {
// expected
}
+   }
 
+   /**
+* Tests that we fail the operation if the remote connection closes.
+*/
+   @Test
+   public void testConnectionClosedHandling() throws Exception {
+   final ServerSocket serverSocket = new ServerSocket(0);
+   final String targetAddress = "localhost";
+   final int targetPort = serverSocket.getLocalPort();
+
+   try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+   final CompletableFuture 
responseFuture = restClient.sendRequest(
+   targetAddress,
+   targetPort,
+   new TestMessageHeaders(),
+   EmptyMessageParameters.getInstance(),
+   EmptyRequestBody.getInstance(),
+   Collections.emptyList());
+
+   // establish connection
+   final Socket connectionSocket = serverSocket.accept();
+
+   // close connection
+   connectionSocket.close();
+
+   try {
+   responseFuture.get();
+   } catch (ExecutionException ee) {
+   assertThat(ExceptionUtils.findThrowable(ee, 
ConnectionClosedException.class).isPresent(), is(true));
+   }
+   } finally {
+   serverSocket.close();
+   }
+   }
+
+   /**
+* Tests that we fail the operation if the client closes.
+*/
+   @Test
+   public void testRestClientClosedHandling() throws Exception {
+   final ServerSocket serverSocket = new ServerSocket(0);
+   final String targetAddress = "localhost";
+   final int targetPort = serverSocket.getLocalPort();
+   Socket connectionSocket = null;
+
+   try (final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+   final CompletableFuture 
responseFuture = restClient.sendRequest(
 
 Review comment:
   but it does fail with a different exception, no? although from a 
user-perspective the same issue happened.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629064#comment-16629064
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220634306
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
+
+Such semantic has an important limitation:
+it requires to keep both sides of the join input on the state indefinitely and 
resource usage will grow indefinitely as well.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+---
+
+In this case we are restricting scope of the join to some time window.
+This allows Flink to remove old values from the state (using 
[watermarks](time_attributes.html) without affecting the correctness of the 
result.
+
+Example:
+{% highlight sql %}
+SELECT *
+FROM
+  Orders o,
+  Shipments s
+WHERE o.id = s.orderId AND
+  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+Temporal Table Joins
+
+
+Temporal Table Joins allow to join a stream (left/probe side) with a table 
(right/build side) that changes over time.
+For each record from the probe side, it will be joined only with the latest 
version of the build side.
+That means (in contrast to [Regular Joins](#regular-joins)) if there is a new 
record on the build side,
+it will not affect the previous past results of the join.
 
 Review comment:
   "the previous past results" -> "the previous results"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629052#comment-16629052
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220613633
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -729,7 +729,7 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 
'd).select('a, 'b, 'e)
   
 Note: Time-windowed joins are a subset of regular joins that 
can be processed in a streaming fashion.
 
-A time-windowed join requires at least one equi-join predicate and 
a join condition that bounds the time on both sides. Such a condition can be 
defined by two appropriate range predicates (, =, =, 
) or a single equality predicate that compares time attributes of the same type 
(i.e., processing time or event time) of both input tables. 
+A time-windowed join requires at least one equi-join predicate and 
a join condition that bounds the time on both sides. Such a condition can be 
defined by two appropriate range predicates (, =, =, 
) or a single equality predicate that compares time attributes of the same type 
(i.e., processing time or event time) of both input tables. 
 
 Review comment:
   change link to `streaming/time_attributes.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629034#comment-16629034
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220598707
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -316,7 +316,7 @@ GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
 Streaming
   

-Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute
+Note: All aggregates must be defined over the same window, 
i.e., same partitioning, sorting, and range. Currently, only windows with 
PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges 
with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute
 
 Review comment:
   Link should be `streaming/time_attributes.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629035#comment-16629035
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220636637
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -669,6 +669,30 @@ Table result = orders
 {% endhighlight %}
   
 
+
+  
+Temporal Join with Temporal Table
+Streaming
+  
+  
+Temporal Tables are 
represented as Table Functions.
 
 Review comment:
   See comments on sql.md


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629028#comment-16629028
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220613399
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -608,7 +608,7 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = 
d").select("a, b, e");
   
 Note: Time-windowed joins are a subset of regular joins that 
can be processed in a streaming fashion.
 
-A time-windowed join requires at least one equi-join predicate and 
a join condition that bounds the time on both sides. Such a condition can be 
defined by two appropriate range predicates (, =, =, 
) or a single equality predicate that compares time attributes of the same type 
(i.e., processing time or event time) of both input tables. 
+A time-windowed join requires at least one equi-join predicate and 
a join condition that bounds the time on both sides. Such a condition can be 
defined by two appropriate range predicates (, =, =, 
) or a single equality predicate that compares time attributes of the same type 
(i.e., processing time or event time) of both input tables. 
 
 Review comment:
   change link to `streaming/time_attributes.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629044#comment-16629044
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220604070
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -769,15 +769,15 @@ The start and end timestamps of group windows as well as 
time attributes can be
 SESSION_ROWTIME(time_attr, interval)
   
   Returns the timestamp of the inclusive upper bound of the 
corresponding tumbling, hopping, or session window.
-  The resulting attribute is a rowtime attribute that can be used in 
subsequent time-based operations such as time-windowed 
joins and group window or over window 
aggregations.
+  The resulting attribute is a rowtime attribute that can be used in 
subsequent time-based operations such as time-windowed 
joins and group window or over window 
aggregations.
 
 
   
 TUMBLE_PROCTIME(time_attr, interval)
 HOP_PROCTIME(time_attr, interval, interval)
 SESSION_PROCTIME(time_attr, interval)
   
-  Returns a proctime 
attribute that can be used in subsequent time-based operations such as time-windowed joins and group window 
or over window aggregations.
+  Returns a proctime 
attribute that can be used in subsequent time-based operations such as time-windowed joins and group window 
or over window aggregations.
 
 Review comment:
   link should be `streaming/time_attributes.html#processing-time`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629091#comment-16629091
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220635686
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
+
+Such semantic has an important limitation:
+it requires to keep both sides of the join input on the state indefinitely and 
resource usage will grow indefinitely as well.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+---
+
+In this case we are restricting scope of the join to some time window.
+This allows Flink to remove old values from the state (using 
[watermarks](time_attributes.html) without affecting the correctness of the 
result.
+
+Example:
+{% highlight sql %}
+SELECT *
+FROM
+  Orders o,
+  Shipments s
+WHERE o.id = s.orderId AND
+  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+Temporal Table Joins
+
+
+Temporal Table Joins allow to join a stream (left/probe side) with a table 
(right/build side) that changes over time.
+For each record from the probe side, it will be joined only with the latest 
version of the build side.
+That means (in contrast to [Regular Joins](#regular-joins)) if there is a new 
record on the build side,
+it will not affect the previous past results of the join.
+This again allow Flink to limit the number of elements that must be kept on 
the state.
+In order to support updates (overwrites) of previous values on the build side 
table, this table must define a primary key.
+
+Compared to [Time-windowed Joins](#time-windowed-joins),
+Temporal Table Joins are not defining a time window within which bounds the 
records will be joined.
+Records from the probe side are joined with the most recent versions of the 
build side and records on the build side might be arbitrary old.
 
 Review comment:
   We need to rephrase "most recent version" once we support event time. Maybe 
it makes sense to explain this a bit more general. This would also help to 
understand the syntax better (why do we need to pass a proc time attribute into 
the temporal table function?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629030#comment-16629030
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220598373
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -343,7 +343,7 @@ WINDOW w AS (
 {% highlight sql %}
 SELECT DISTINCT users FROM Orders
 {% endhighlight %}
-   Note: For streaming queries the required state to compute the 
query result might grow infinitely depending on the number of distinct fields. 
Please provide a query configuration with valid retention interval to prevent 
excessive state size. See Streaming Concepts for 
details.
+   Note: For streaming queries the required state to compute the 
query result might grow infinitely depending on the number of distinct fields. 
Please provide a query configuration with valid retention interval to prevent 
excessive state size. See Streaming Concepts for 
details.
 
 Review comment:
   Link should be `streaming/query_configuration.html` and change "Streaming 
Concepts" to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629061#comment-16629061
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220632174
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
+
+Such semantic has an important limitation:
+it requires to keep both sides of the join input on the state indefinitely and 
resource usage will grow indefinitely as well.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+---
+
+In this case we are restricting scope of the join to some time window.
+This allows Flink to remove old values from the state (using 
[watermarks](time_attributes.html) without affecting the correctness of the 
result.
+
+Example:
+{% highlight sql %}
+SELECT *
+FROM
+  Orders o,
+  Shipments s
+WHERE o.id = s.orderId AND
+  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+Temporal Table Joins
+
+
+Temporal Table Joins allow to join a stream (left/probe side) with a table 
(right/build side) that changes over time.
 
 Review comment:
   "A Temporal Table Join joins an append-only table (left input) with a 
Temporal Table(link) (right input), i.e., a table that changes over time and 
tracks its changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629065#comment-16629065
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220618395
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t 
AS tag ON TRUE
 Note: Currently, only literal TRUE is supported 
as predicate for a left outer join against a lateral table.
   
 
+
+  
+Temporal Join with Temporal Table
 
 Review comment:
   "Temporal Join with Temporal Table" sounds a bit repetitive. How about "Join 
with Temporal Table"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629027#comment-16629027
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220599532
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -287,7 +287,7 @@ SELECT PRETTY_PRINT(user) FROM Orders
 Result Updating
   
   
-Note: GroupBy on a streaming table produces an updating 
result. See the Streaming Concepts page for 
details.
+Note: GroupBy on a streaming table produces an updating 
result. See the Streaming Concepts page for details.
 
 Review comment:
   change link to `streaming/dynamic_tables.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629088#comment-16629088
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220630952
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
+
+Such semantic has an important limitation:
+it requires to keep both sides of the join input on the state indefinitely and 
resource usage will grow indefinitely as well.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+---
+
+In this case we are restricting scope of the join to some time window.
 
 Review comment:
   "A time-windowed join is defined by a join predicate that checks if the time 
attributes(link) of the input records are within a time-window. Since time 
attributes are quasi-monontic increasing, Flink can remove ..." 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629059#comment-16629059
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220603395
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -760,7 +760,7 @@ The start and end timestamps of group windows as well as 
time attributes can be
 SESSION_END(time_attr, interval)
   
   Returns the timestamp of the exclusive upper bound of the 
corresponding tumbling, hopping, or session window.
-Note: The exclusive upper bound timestamp cannot be 
used as a rowtime attribute in 
subsequent time-based operations, such as time-windowed 
joins and group window or over window 
aggregations.
+Note: The exclusive upper bound timestamp cannot be 
used as a rowtime attribute in 
subsequent time-based operations, such as time-windowed 
joins and group window or over window 
aggregations.
 
 Review comment:
   link should be `streaming/time_attributes.html`, change "rowtime attribute" 
to "time attribute"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629072#comment-16629072
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220625871
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
 
 Review comment:
   `the Joins sections`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629080#comment-16629080
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220613752
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1035,7 +1035,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c)
 val right = ds2.toTable(tableEnv, 'a)
 val result = left.select('a, 'b, 'c).where('a.in(right))
 {% endhighlight %}
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629038#comment-16629038
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220600142
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -459,7 +459,7 @@ ANY.in(TABLE)
   
   
 Returns TRUE if ANY is equal to a row returned by sub-query 
TABLE.
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629053#comment-16629053
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220600253
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -639,7 +639,7 @@ ANY.in(TABLE)
   
   
 Returns TRUE if ANY is equal to a row returned by sub-query 
TABLE.
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629083#comment-16629083
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220614156
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1430,7 +1430,7 @@ A session window is defined by using the `Session` class 
as follows:
 
 
   on
-  The time attribute to group (time interval) or sort (row count) on. 
For batch queries this might be any Long or Timestamp attribute. For streaming 
queries this must be a declared 
event-time or processing-time time attribute.
+  The time attribute to group (time interval) or sort (row count) on. 
For batch queries this might be any Long or Timestamp attribute. For streaming 
queries this must be a declared 
event-time or processing-time time attribute.
 
 Review comment:
   change link to `streaming/time_attributes.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629069#comment-16629069
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220612780
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -420,7 +420,7 @@ orders.groupBy("users").select("users, 
myUdagg.distinct(points) as myDistinctRes
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
 {% endhighlight %}
-Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming Concepts 
for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629049#comment-16629049
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220602885
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -728,7 +728,7 @@ Group windows are defined in the `GROUP BY` clause of a 
SQL query. Just like que
 
  Time Attributes
 
-For SQL queries on streaming tables, the `time_attr` argument of the group 
window function must refer to a valid time attribute that specifies the 
processing time or event time of rows. See the [documentation of time 
attributes](streaming.html#time-attributes) to learn how to define time 
attributes.
+For SQL queries on streaming tables, the `time_attr` argument of the group 
window function must refer to a valid time attribute that specifies the 
processing time or event time of rows. See the [documentation of time 
attributes](streaming#time_attributes.html) to learn how to define time 
attributes.
 
 Review comment:
   link should be `streaming/time_attributes.html`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629037#comment-16629037
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220602421
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -591,7 +591,7 @@ WHERE product IN (
 SELECT product FROM NewProducts
 )
 {% endhighlight %}
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   Link should be `streaming/query_configuration.html` and change "Streaming 
Concepts" to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629071#comment-16629071
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220629382
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
 
 Review comment:
   "For example, if there is a new ..."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629031#comment-16629031
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220621269
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t 
AS tag ON TRUE
 Note: Currently, only literal TRUE is supported 
as predicate for a left outer join against a lateral table.
   
 
+
+  
+Temporal Join with Temporal Table
+Streaming
+  
+  
+Temporal Tables are 
represented as Table Functions.
 
 Review comment:
   Extend the description?
   
   > Temporal Tables(link) are tables that track their changes over time. A 
Temporal Table Function(link) provides access to the state of a Temporal Table 
at a specific point in time. The syntax to join a table with a Temporal Table 
Function is the same as in Join with Table Functions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629082#comment-16629082
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220618900
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t 
AS tag ON TRUE
 Note: Currently, only literal TRUE is supported 
as predicate for a left outer join against a lateral table.
   
 
+
+  
+Temporal Join with Temporal Table
 
 Review comment:
   I think we can remove the link in the heading cell. There's another link in 
the description cell.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629078#comment-16629078
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220613508
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -718,7 +718,7 @@ val leftOuterResult = left.leftOuterJoin(right, 'a === 
'd).select('a, 'b, 'e)
 val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
 {% endhighlight %}
-Note: For streaming queries the required state to compute the query 
result might grow infinitely depending on the number of distinct input rows. 
Please provide a query configuration with valid retention interval to prevent 
excessive state size. See Streaming Concepts for 
details.
+Note: For streaming queries the required state to compute the query 
result might grow infinitely depending on the number of distinct input rows. 
Please provide a query configuration with valid retention interval to prevent 
excessive state size. See Streaming Concepts for 
details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629032#comment-16629032
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220613700
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -915,7 +915,7 @@ tableEnv.registerTable("RightTable", right);
 Table result = left.select("a, b, c").where("a.in(RightTable)");
 {% endhighlight %}
 
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629086#comment-16629086
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220633567
 
 

 ##
 File path: docs/dev/table/streaming/joins.md
 ##
 @@ -0,0 +1,93 @@
+---
+title: "Joins"
+nav-parent_id: streaming_tableapi
+nav-pos: 3
+---
+
+
+In batch processing joins are relatively easy, since we are working on a 
bounded completed data sets.
+In stream processing things are a little bit more complicated, especially when 
it comes to the issue how to handle that data can change over time.
+Because of that, there are couple of ways to actually perform the join using 
either Table API or SQL.
+
+For more information regarding syntax please check Joins sections in [Table 
API](../tableApi.html#joins) and [SQL](../sql.html#joins).
+
+* This will be replaced by the TOC
+{:toc}
+
+Regular Joins
+-
+
+This is the most basic case in which any new records or changes to either side 
of the join input are visible and are affecting whole join result.
+If there is a new record on the left side, it will be joined with all of the 
previous and future records on the other side.
+
+Such semantic has an important limitation:
+it requires to keep both sides of the join input on the state indefinitely and 
resource usage will grow indefinitely as well.
+
+Example:
+{% highlight sql %}
+SELECT * FROM Orders
+INNER JOIN Product
+ON Orders.productId = Product.id
+{% endhighlight %}
+
+Time-windowed Joins
+---
+
+In this case we are restricting scope of the join to some time window.
+This allows Flink to remove old values from the state (using 
[watermarks](time_attributes.html) without affecting the correctness of the 
result.
+
+Example:
+{% highlight sql %}
+SELECT *
+FROM
+  Orders o,
+  Shipments s
+WHERE o.id = s.orderId AND
+  o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
+{% endhighlight %}
+
+Temporal Table Joins
+
+
+Temporal Table Joins allow to join a stream (left/probe side) with a table 
(right/build side) that changes over time.
+For each record from the probe side, it will be joined only with the latest 
version of the build side.
+That means (in contrast to [Regular Joins](#regular-joins)) if there is a new 
record on the build side,
+it will not affect the previous past results of the join.
+This again allow Flink to limit the number of elements that must be kept on 
the state.
+In order to support updates (overwrites) of previous values on the build side 
table, this table must define a primary key.
+
+Compared to [Time-windowed Joins](#time-windowed-joins),
+Temporal Table Joins are not defining a time window within which bounds the 
records will be joined.
+Records from the probe side are joined with the most recent versions of the 
build side and records on the build side might be arbitrary old.
+As time passes the previous, no longer needed, versions of the record (for the 
given primary key) will be removed from the state.
+
+Example:
+{% highlight sql %}
+SELECT
+  o.amount * r.rate AS amount
+FROM
+  Orders AS o,
+  LATERAL TABLE (Rates(o.rowtime)) AS r
 
 Review comment:
   `o.rowtime` -> `o.proctime`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629066#comment-16629066
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220609016
 
 

 ##
 File path: docs/dev/table/streaming/index.md
 ##
 @@ -0,0 +1,73 @@
+---
+title: "Streaming Concepts"
+nav-id: streaming_tableapi
+nav-parent_id: tableapi
+nav-pos: 10
+is_beta: false
+nav-show_overview: true
+---
+
+
+Flink's [Table API](../tableApi.html) and [SQL support](../sql.html) are 
unified APIs for batch and stream processing. This means that Table API and SQL 
queries have the same semantics regardless whether their input is bounded batch 
input or unbounded stream input. Because the relational algebra and SQL were 
originally designed for batch processing, relational queries on unbounded 
streaming input are not as well understood as relational queries on bounded 
batch input.
+
+In this section, we explain concepts, practical limitations, and 
stream-specific configuration parameters of Flink's relational APIs on 
streaming data.
+
+Relational Queries on Data Streams
+--
+
+SQL and the relational algebra have not been designed with streaming data in 
mind. As a consequence, there are few conceptual gaps between relational 
algebra (and SQL) and stream processing.
+
+
+   
+   Relational Algebra / SQL
+   Stream Processing
+   
+   
+   Relations (or tables) are bounded (multi-)sets of 
tuples.
+   A stream is an infinite sequences of tuples.
+   
+   
+   A query that is executed on batch data (e.g., a table in a 
relational database) has access to the complete input data.
+   A streaming query cannot access all data when is started 
and has to "wait" for data to be streamed in.
+   
+   
+   A batch query terminates after it produced a fixed sized 
result.
+   A streaming query continuously updates its result based on 
the received records and never completes.
+   
+
+
+Despite these differences, processing streams with relational queries and SQL 
is not impossible. Advanced relational database systems offer a feature called 
*Materialized Views*. A materialized view is defined as a SQL query, just like 
a regular virtual view. In contrast to a virtual view, a materialized view 
caches the result of the query such that the query does not need to be 
evaluated when the view is accessed. A common challenge for caching is to 
prevent a cache from serving outdated results. A materialized view becomes 
outdated when the base tables of its definition query are modified. *Eager View 
Maintenance* is a technique to update materialized views and updates a 
materialized view as soon as its base tables are updated. 
+
+The connection between eager view maintenance and SQL queries on streams 
becomes obvious if we consider the following:
+
+- A database table is the result of a *stream* of `INSERT`, `UPDATE`, and 
`DELETE` DML statements, often called *changelog stream*.
+- A materialized view is defined as a SQL query. In order to update the view, 
the query is continuously processes the changelog streams of the view's base 
relations.
+- The materialized view is the result of the streaming SQL query.
+
+With these points in mind, we introduce following concepts:
+
+* [Dynamic Tables  Continuous Queries]({{ site.baseurl 
}}/dev/table/streaming/dynamic_tables.html)
+* [Time attributes]({{ site.baseurl 
}}/dev/table/streaming/time_attributes.html)
+* [Joins]({{ site.baseurl }}/dev/table/streaming/joins.html)
 
 Review comment:
   Change "Joins" to "Joins in Continuous Queries" to emphasize that this is 
not about joins in general?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629046#comment-16629046
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220601206
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -297,7 +297,7 @@ value IN (sub-query)
   
   
 Returns TRUE if value is equal to a row returned by 
sub-query.
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629050#comment-16629050
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r220601252
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -285,7 +285,7 @@ EXISTS (sub-query)
   
   
 Returns TRUE if sub-query returns at least one row. Only 
supported if the operation can be rewritten in a join and group operation.
-Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
+Note: For streaming queries the operation is rewritten in a 
join and group operation. The required state to compute the query result might 
grow infinitely depending on the number of distinct input rows. Please provide 
a query configuration with valid retention interval to prevent excessive state 
size. See Streaming Concepts for details.
 
 Review comment:
   change link to `streaming/query_configuration.html` and "Streaming Concepts" 
to "Query Configuration"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support enrichment joins in Flink SQL/Table API
> ---
>
> Key: FLINK-9712
> URL: https://issues.apache.org/jira/browse/FLINK-9712
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> As described here:
> https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >