[jira] [Resolved] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui resolved FLINK-10145. - Resolution: Fixed Fix Version/s: 1.7.0 Implemented in 1.7.0: 24af70fdecbbb66e8555df7aca35a92a2f1aa7ac > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 11082941385..00465727399 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ]) + + +{% highlight text %} +REPLACE(string1, string2, string3) +{% endhighlight %} + + +Returns a new string which replaces string2 with string3 (non-overlapping) from string1 +E.g., REPLACE("hello world", "world", "flink") returns "hello flink"; REPLACE("ababab", "abab", "z") returns "zab". + + + {% highlight text %} @@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2) + + +{% highlight java %} +STRING1.replace(STRING2, STRING3) +{% endhighlight %} + + +Returns a new string which replaces STRING2 with STRING3 (non-overlapping) from STRING1. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'. + + + {% highlight java %} @@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2) + + +{% highlight scala %} +STRING1.replace(STRING2, STRING3) +{% endhighlight %} + + +Returns a new string which replaces STRING2 with STRING3 (non-overlapping) from STRING1. +E.g., "hello world".replace("world", "flink") returns "hello flink"; "ababab".replace("abab", "z") returns "zab". + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 14adb71bbab..3c726678b24 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -471,6 +471,13 @@ trait ImplicitExpressionOperations { } } + /** +* Creates a new string of the given string with non-overlapping occurrences +* of given search replaced with replacement. +*/ + def replace(search: Expression, replacement: Expression) = +Replace(expr, search, replacement) + /** * Returns the length of a string. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index e32005213eb..bd05f9bd71e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -153,6 +153,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethods.REGEXP_REPLACE) + addSqlFunctionMethod( +REPLACE, +Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO), +STRING_TYPE_INFO, +BuiltInMethod.REPLACE.method) + addSqlFunctionMethod( FROM_BASE64, Seq(STRING_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index 5734b369fdf..f5e6ad2fff3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala @@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT override def toString: String = s"($str).repeat($n)" } + +/** + * Returns the string `str` with all non-overlapping occurrences +* of `search` replaced with `replacement`. +*/ + case class Replace(str: Expression, + search: Expression, + replacement: Expression) extends Expression with InputTypeSpec { + +def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str)) + +override private[flink] def children: Seq[Expression] = str :: search :: replacement :: Nil + +override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO + +override private[flink] def expectedTypes: Seq[TypeInformation[_]] = +
[jira] [Commented] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629792#comment-16629792 ] ASF GitHub Bot commented on FLINK-10145: asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6576 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 11082941385..00465727399 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ]) + + +{% highlight text %} +REPLACE(string1, string2, string3) +{% endhighlight %} + + +Returns a new string which replaces string2 with string3 (non-overlapping) from string1 +E.g., REPLACE("hello world", "world", "flink") returns "hello flink"; REPLACE("ababab", "abab", "z") returns "zab". + + + {% highlight text %} @@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2) + + +{% highlight java %} +STRING1.replace(STRING2, STRING3) +{% endhighlight %} + + +Returns a new string which replaces STRING2 with STRING3 (non-overlapping) from STRING1. +E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'. + + + {% highlight java %} @@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2) + + +{% highlight scala %} +STRING1.replace(STRING2, STRING3) +{% endhighlight %} + + +Returns a new string which replaces STRING2 with STRING3 (non-overlapping) from STRING1. +E.g., "hello world".replace("world", "flink") returns "hello flink"; "ababab".replace("abab", "z") returns "zab". + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 14adb71bbab..3c726678b24 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -471,6 +471,13 @@ trait ImplicitExpressionOperations { } } + /** +* Creates a new string of the given string with non-overlapping occurrences +* of given search replaced with replacement. +*/ + def replace(search: Expression, replacement: Expression) = +Replace(expr, search, replacement) + /** * Returns the length of a string. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index e32005213eb..bd05f9bd71e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -153,6 +153,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethods.REGEXP_REPLACE) + addSqlFunctionMethod( +REPLACE, +Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO), +STRING_TYPE_INFO, +BuiltInMethod.REPLACE.method) + addSqlFunctionMethod( FROM_BASE64, Seq(STRING_TYPE_INFO), diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index 5734b369fdf..f5e6ad2fff3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala @@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT override def toString: String = s"($str).repeat($n)" } + +/** + * Returns the string `str` with all non-overlapping occurrences +* of `search` replaced with `replacement`. +*/ + case class Replace(str: Expression, + search: Expression, + replacement: Expression) extends Expression with InputTypeSpec { + +def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str)) + +override private[flink] def children:
[jira] [Comment Edited] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769 ] tison edited comment on FLINK-10429 at 9/27/18 5:18 AM: I like the proposal treat schedule a separated component. It is helpful for further optimize on scheduling such as FLINK-10240. To [~zhuzh], [~tiemsn] I think the first step of the achieve of this redesign would be the extract part, maybe you can take this document into consideration? Also, the link to FLINK-10240 is broken, the correct one is https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/ (without the "edit" part). And I afraid that it would prevent further discussion that this document is READ-ONLY, any thought could not be commented on. For more information, our users start looking for more flexible schedule strategy. Wish this redesign could help. was (Author: tison): I like the proposal treat schedule a separated component. It is helpful for further optimize on scheduling such as FLINK-10240. To [~zhuzh], [~tiemsn] I think the first step of the achieve of this redesign would be the extract part, maybe you can take this document into consideration? Also, the link to FLINK-10240 is broken, the correct one is https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/ . And I afraid that it would prevent further discussion that this document is READ-ONLY, any thought could not be commented on. For more information, our users start looking for more flexible schedule strategy. Wish this redesign could help. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769 ] tison edited comment on FLINK-10429 at 9/27/18 5:18 AM: I like the proposal treat schedule a separated component. It is helpful for further optimize on scheduling such as FLINK-10240. To [~zhuzh], [~tiemsn] I think the first step of the achieve of this redesign would be the extract part, maybe you can take this document into consideration? Also, the link to FLINK-10240 is broken, the correct one is https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/ (without the "edit" part). And I afraid that it would prevent further discussion that this document is READ-ONLY, any thought could not be commented on. For more information, our users start looking for more flexible schedule strategy[1]. Wish this redesign could help. [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Scheduling-sources-td23344.html was (Author: tison): I like the proposal treat schedule a separated component. It is helpful for further optimize on scheduling such as FLINK-10240. To [~zhuzh], [~tiemsn] I think the first step of the achieve of this redesign would be the extract part, maybe you can take this document into consideration? Also, the link to FLINK-10240 is broken, the correct one is https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/ (without the "edit" part). And I afraid that it would prevent further discussion that this document is READ-ONLY, any thought could not be commented on. For more information, our users start looking for more flexible schedule strategy. Wish this redesign could help. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629769#comment-16629769 ] tison commented on FLINK-10429: --- I like the proposal treat schedule a separated component. It is helpful for further optimize on scheduling such as FLINK-10240. To [~zhuzh], [~tiemsn] I think the first step of the achieve of this redesign would be the extract part, maybe you can take this document into consideration? Also, the link to FLINK-10240 is broken, the correct one is https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/ . And I afraid that it would prevent further discussion that this document is READ-ONLY, any thought could not be commented on. For more information, our users start looking for more flexible schedule strategy. Wish this redesign could help. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629735#comment-16629735 ] shuai.xu commented on FLINK-10429: -- +1 This proposal goes in the same direction as our approach. We have already initiated a discussion about how to make the schedule strategy plugable in [https://docs.google.com/document/d/1zAseuBnqNXg3pst3vLBTc8yGOUo485J2LVWBAdFCW9I/edit.] I think we can make the scheduler more powerful together. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629718#comment-16629718 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099 cc @tillrohrmann @StephanEwen could you help advance this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629716#comment-16629716 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099 cc @tillrohrmann @StephanEwen could you help advance this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099 cc @tillrohrmann @StephanEwen could you help advance this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-424951099 cc @tillrohrmann @StephanEwen could you help advance this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10418) Add COTH math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629709#comment-16629709 ] ASF GitHub Bot commented on FLINK-10418: yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & SQL] Add COTH math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6764#discussion_r220784635 ## File path: docs/dev/table/functions.md ## @@ -1230,6 +1230,17 @@ COT(numeric) + + +{% highlight text %} +COTH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cotangent of a numeric. Review comment: please add some information about returned types, refer to @pnowojski 's review suggestion in #6700 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add COTH math function supported in Table API and SQL > - > > Key: FLINK-10418 > URL: https://issues.apache.org/jira/browse/FLINK-10418 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > > Inspired by FLINK-10398 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & SQL] Add COTH math function supported in Table API and SQL
yanghua commented on a change in pull request #6764: [FLINK-10418] [Table API & SQL] Add COTH math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6764#discussion_r220784635 ## File path: docs/dev/table/functions.md ## @@ -1230,6 +1230,17 @@ COT(numeric) + + +{% highlight text %} +COTH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cotangent of a numeric. Review comment: please add some information about returned types, refer to @pnowojski 's review suggestion in #6700 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629684#comment-16629684 ] ASF GitHub Bot commented on FLINK-10386: TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220779905 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: Oh I get your point, sorry for the delay. This is a bit about style, and I think both of them are OK. If there is a mandatory requirement on "not change that", I am ok to revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Sub-task > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220779905 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ## @@ -429,7 +413,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); Review comment: Oh I get your point, sorry for the delay. This is a bit about style, and I think both of them are OK. If there is a mandatory requirement on "not change that", I am ok to revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener
[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629681#comment-16629681 ] ASF GitHub Bot commented on FLINK-10386: TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220779551 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -846,6 +817,23 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable // Test Utilities // + private static void waitUntilExecutionState(Task task, ExecutionState exceptedState, Deadline deadline) { + while (deadline.hasTimeLeft()) { + if (exceptedState == task.getExecutionState()) { + return; + } + + try { + Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200)); Review comment: Thanks! I change waiting style to notify style, it would be more stable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > -- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Sub-task > Components: TaskManager >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener
TisonKun commented on a change in pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729#discussion_r220779551 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -846,6 +817,23 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable // Test Utilities // + private static void waitUntilExecutionState(Task task, ExecutionState exceptedState, Deadline deadline) { + while (deadline.hasTimeLeft()) { + if (exceptedState == task.getExecutionState()) { + return; + } + + try { + Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200)); Review comment: Thanks! I change waiting style to notify style, it would be more stable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629674#comment-16629674 ] ASF GitHub Bot commented on FLINK-10339: zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982 Thanks for your reviews! @StephanEwen That is a good idea for adding `allocateUnpooledOffHeapMemory` in the factory, so it can also make easy for using in `MemoryManager` and `NetworkBufferPool`. I already update the codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool
zhijiangW commented on issue #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762#issuecomment-424935982 Thanks for your reviews! @StephanEwen That is a good idea for adding `allocateUnpooledOffHeapMemory` in the factory, so it can also make easy for using in `MemoryManager` and `NetworkBufferPool`. I already update the codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8594) Make producesUpdates in DataStreamGroupAggregate return false if it is a distinct group by without state retention configuration
[ https://issues.apache.org/jira/browse/FLINK-8594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629671#comment-16629671 ] Hequn Cheng commented on FLINK-8594: Hi [~twalthr], you are right. {{SELECT DISTINCT}} should be an append stream. The main problem is we have to output data to reset state clean-up timers of the downstream operators when retention time has been set. Sorry that I haven't started to solve the problem. To solve the problem, I will try to solve [FLINK-8566|https://issues.apache.org/jira/browse/FLINK-8566] first. What do you think? > Make producesUpdates in DataStreamGroupAggregate return false if it is a > distinct group by without state retention configuration > - > > Key: FLINK-8594 > URL: https://issues.apache.org/jira/browse/FLINK-8594 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Internally, the {{DISTINCT}} is translated into a {{GROUP BY}} with all > distinct fields being keys and no aggregation functions. However, this kind > of \{{GROUP BY}} don't generate update at all if the state retention time has > not been configured. We can treat the result table as an append table. > Therefore, we can make the \{{producesUpdates}} function return false, so the > downstream group by will not choose the correspond retract agg function, > i.e., {{MaxWithRetractAggFunction}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10429) Redesign Flink Scheduling, introducing dedicated Scheduler component
[ https://issues.apache.org/jira/browse/FLINK-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629571#comment-16629571 ] JIN SUN commented on FLINK-10429: - +1 I like the proposal, have put some comments in the document. > Redesign Flink Scheduling, introducing dedicated Scheduler component > > > Key: FLINK-10429 > URL: https://issues.apache.org/jira/browse/FLINK-10429 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > > This epic tracks the redesign of scheduling in Flink. Scheduling is currently > a concern that is scattered across different components, mainly the > ExecutionGraph/Execution and the SlotPool. Scheduling also happens only on > the granularity of individual tasks, which make holistic scheduling > strategies hard to implement. In this epic we aim to introduce a dedicated > Scheduler component that can support use-case like auto-scaling, > local-recovery, and resource optimized batch. > The design for this feature is developed here: > https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10446) Use the "guava beta checker" plugin to keep off of @Beta API
Ted Yu created FLINK-10446: -- Summary: Use the "guava beta checker" plugin to keep off of @Beta API Key: FLINK-10446 URL: https://issues.apache.org/jira/browse/FLINK-10446 Project: Flink Issue Type: Task Reporter: Ted Yu The Guava people publish an Error Prone plugin to detect when stuff that's annotated with @Beta gets used. Those things shouldn't be used because the project gives no promises about deprecating before removal. plugin: https://github.com/google/guava-beta-checker -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10288) Failover Strategy improvement
[ https://issues.apache.org/jira/browse/FLINK-10288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN updated FLINK-10288: Fix Version/s: 1.7.0 > Failover Strategy improvement > - > > Key: FLINK-10288 > URL: https://issues.apache.org/jira/browse/FLINK-10288 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Fix For: 1.7.0 > > > Flink pays significant efforts to make Streaming Job fault tolerant. The > checkpoint mechanism and exactly once semantics make Flink different than > other systems. However, there are still some cases not been handled very > well. Those cases can apply to both Streaming and Batch scenarios, and its > orthogonal with current fault tolerant mechanism. Here is a summary of those > cases: > # Some failures are non-recoverable, such as a user error: > DividebyZeroException. We shouldn't try to restart the task, as it will never > succeed. The DivideByZeroException is just a simple case, those errors > sometime are not easy to reproduce or predict, as it might be only triggered > by specific input data, we shouldn’t retry for all user code exceptions. > # There is no limit for task retry today, unless a SuppressRestartException > was encountered, a task will keep on retrying until it succeeds. As mentioned > above, we shouldn’t retry for some cases at all, and for the Exceptions we > can retry, such as a network exception, should we have a retry limit? We need > retry for any transient issue, but we also need to set a limit to avoid > infinite retry and resource wasting. For Batch and Streaming workload, we > might need different strategies. > # There are some exceptions due to hardware issues, such as disk/network > malfunction. when a task/TaskManager fail on this, we’d better detect and > avoid to schedule to that machine next time. > # If a task read from a blocking result partition, when its input is not > available, we can ‘revoke’ the produce task, set the task fail and rerun the > upstream task to regenerate data. the revoke can propagate up through the > chain. In Spark, revoke is naturally support by lineage. > To make fault tolerance easier, we need to keep deterministic behavior as > much as possible. For user code, it’s not easy to control. However, for > system related code, we can fix it. For example, we should at least make sure > the different attempt of a same task to have the same inputs (we have a bug > in current codebase (DataSourceTask) that cannot guarantee this). Note that > this is track by [Flink-10205] > Details see this proposal: > [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN updated FLINK-10298: Fix Version/s: 1.7.0 > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Fix For: 1.7.0 > > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN updated FLINK-10289: Fix Version/s: 1.7.0 > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JIN SUN updated FLINK-10205: Affects Version/s: 1.6.2 1.6.1 > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629473#comment-16629473 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220735704 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ## @@ -435,7 +435,6 @@ private void assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S Assert.assertEquals(expected.getName(), actual.getName()); Assert.assertEquals(expected.getBackendStateType(), actual.getBackendStateType()); Assert.assertEquals(expected.getOptionsImmutable(), actual.getOptionsImmutable()); - Assert.assertEquals(expected.getSerializersImmutable(), actual.getSerializersImmutable()); Review comment: Just double-checking: Was this intentionally removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629475#comment-16629475 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220738712 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration + * snapshots in checkpoints. + * + * Factory for a read serializer when schema conversion is required: in the case that new + * serializers are not compatible to read previous data, a schema conversion process executed across all data + * is required before the new serializer can be continued to be used. This conversion process requires a compatible + * read serializer to restore serialized bytes as objects, and then written back again using the new serializer. + * In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read + * serializer of the conversion process. + * + * + * Serializer Configuration and Schema + * + * Since serializer configuration snapshots needs to be used to ensure serialization compatibility + * for the same managed state as well as serving as a factory for compatible read serializers, the configuration + * snapshot should encode sufficient information about: * * * Parameter settings of the serializer: parameters of the serializer include settings * required to setup the serializer, or the state of the serializer if it is stateful. If the serializer * has nested serializers, then the configuration snapshot should also contain the parameters of the nested * serializers. * - * Serialization schema of the serializer: the data format used by the serializer. + * Serialization schema of the serializer: the binary format used by the serializer, or + * in other words, the schema of data written by the serializer. * * * NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to * deserialize the configuration snapshot from its binary form. + * + * @param The data type that the originating serializer of this configuration serializes. */ @PublicEvolving -public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { +public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; + /** +* The originating serializer of this configuration snapshot. +* +* TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses, +* TODO we currently have a base implementation for the {@link #restoreSerializer()} +* TODO method which simply returns this serializer instance. The serializer is written +* TODO and read using Java serialization as part of reading / writing the config snapshot +*/ + private TypeSerializer
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629474#comment-16629474 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220726336 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; Review comment: Minor comment: Looks like that variable could be defined inside the loop. Good rule of thumb is that variables should have the minimal scope by default . This is
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629471#comment-16629471 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220734142 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. + * However, since some outdated implementations of configuration snapshots did not contain sufficient + * information to serve as a factory, the backwards compatible path for restoring from these older + * savepoints would be to just use the written serializer. + * + * Therefore, when restoring from older savepoints which still contained both the config snapshot + * and the serializer, they are both wrapped within this utility class. When the caller intends + * to instantiate a restore serializer, we simply return the wrapped serializer instance. + * + * @param the data type that the wrapped serializer instance serializes. + */ +@Internal +public class BackwardsCompatibleConfigSnapshot extends TypeSerializerConfigSnapshot { + + /** +* The actual serializer config snapshot. This may be {@code null} when reading a +* savepoint from Flink <= 1.2. +*/ + @Nullable + private TypeSerializerConfigSnapshot wrappedConfigSnapshot; + + /** +* The serializer instance written in savepoints. +*/ + @Nonnull + private TypeSerializer serializerInstance; + + public BackwardsCompatibleConfigSnapshot( + @Nullable TypeSerializerConfigSnapshot wrappedConfigSnapshot, + TypeSerializer serializerInstance) { + + this.wrappedConfigSnapshot = wrappedConfigSnapshot; + this.serializerInstance = Preconditions.checkNotNull(serializerInstance); + } + + @Override + public void write(DataOutputView out) throws IOException { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public void read(DataInputView in) throws IOException { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public int getVersion() { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public TypeSerializer restoreSerializer() { + return serializerInstance; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + if (wrappedConfigSnapshot != null) { + return (TypeSerializerSchemaCompatibility) wrappedConfigSnapshot.resolveSchemaCompatibility(newSerializer); +
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629470#comment-16629470 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220727617 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; + for (int i = 0; i < numFields; i++) { + proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); +
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629472#comment-16629472 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220739021 ## File path: flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java ## @@ -77,6 +77,11 @@ private void resolveVersionRead(int readVersion) throws VersionMismatchException } } + // TODO this is a temporary workaround for FLINK-9377 that should be removed + if (readVersion == getVersion() + 1) { Review comment: This seems fragile here, because it is in a common base class outside the serializer utils. Can this be solved by overriding "getCompatibleVersions()" instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > - > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information
[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629476#comment-16629476 ] ASF GitHub Bot commented on FLINK-9377: --- StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220738283 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; + for (int i = 0; i < numFields; i++) { + proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); +
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220738283 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; + for (int i = 0; i < numFields; i++) { + proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + serializerConfigSnapshots.add(proxy.getSerializerConfigSnapshot()); + } + + return serializerConfigSnapshots; + } + + /** +* Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}. +*/ +
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220735704 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ## @@ -435,7 +435,6 @@ private void assertEqualStateMetaInfoSnapshots(StateMetaInfoSnapshot expected, S Assert.assertEquals(expected.getName(), actual.getName()); Assert.assertEquals(expected.getBackendStateType(), actual.getBackendStateType()); Assert.assertEquals(expected.getOptionsImmutable(), actual.getOptionsImmutable()); - Assert.assertEquals(expected.getSerializersImmutable(), actual.getSerializersImmutable()); Review comment: Just double-checking: Was this intentionally removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220734142 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Objects; + +/** + * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes. + * + * In older versions of Flink (<= 1.6), we used to write state serializers into checkpoints, along + * with the serializer's configuration snapshot. Since 1.7.0, we no longer wrote the serializers, but + * instead used the configuration snapshot as a factory to instantiate serializers for restoring state. + * However, since some outdated implementations of configuration snapshots did not contain sufficient + * information to serve as a factory, the backwards compatible path for restoring from these older + * savepoints would be to just use the written serializer. + * + * Therefore, when restoring from older savepoints which still contained both the config snapshot + * and the serializer, they are both wrapped within this utility class. When the caller intends + * to instantiate a restore serializer, we simply return the wrapped serializer instance. + * + * @param the data type that the wrapped serializer instance serializes. + */ +@Internal +public class BackwardsCompatibleConfigSnapshot extends TypeSerializerConfigSnapshot { + + /** +* The actual serializer config snapshot. This may be {@code null} when reading a +* savepoint from Flink <= 1.2. +*/ + @Nullable + private TypeSerializerConfigSnapshot wrappedConfigSnapshot; + + /** +* The serializer instance written in savepoints. +*/ + @Nonnull + private TypeSerializer serializerInstance; + + public BackwardsCompatibleConfigSnapshot( + @Nullable TypeSerializerConfigSnapshot wrappedConfigSnapshot, + TypeSerializer serializerInstance) { + + this.wrappedConfigSnapshot = wrappedConfigSnapshot; + this.serializerInstance = Preconditions.checkNotNull(serializerInstance); + } + + @Override + public void write(DataOutputView out) throws IOException { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public void read(DataInputView in) throws IOException { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public int getVersion() { + throw new UnsupportedOperationException( + "This is a dummy config snapshot used only for backwards compatibility."); + } + + @Override + public TypeSerializer restoreSerializer() { + return serializerInstance; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + if (wrappedConfigSnapshot != null) { + return (TypeSerializerSchemaCompatibility) wrappedConfigSnapshot.resolveSchemaCompatibility(newSerializer); + } else { + // if there is no configuration snapshot to check against, + // then we can only assume that the new serializer is compatible as is + return
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220727617 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; + for (int i = 0; i < numFields; i++) { + proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + serializerConfigSnapshots.add(proxy.getSerializerConfigSnapshot()); + } + + return serializerConfigSnapshots; + } + + /** +* Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}. +*/ +
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220738712 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * + * Capturing serializer parameters and schema: a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below. * - * The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * Compatibility checks for new serializers: when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration + * snapshots in checkpoints. + * + * Factory for a read serializer when schema conversion is required: in the case that new + * serializers are not compatible to read previous data, a schema conversion process executed across all data + * is required before the new serializer can be continued to be used. This conversion process requires a compatible + * read serializer to restore serialized bytes as objects, and then written back again using the new serializer. + * In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read + * serializer of the conversion process. + * + * + * Serializer Configuration and Schema + * + * Since serializer configuration snapshots needs to be used to ensure serialization compatibility + * for the same managed state as well as serving as a factory for compatible read serializers, the configuration + * snapshot should encode sufficient information about: * * * Parameter settings of the serializer: parameters of the serializer include settings * required to setup the serializer, or the state of the serializer if it is stateful. If the serializer * has nested serializers, then the configuration snapshot should also contain the parameters of the nested * serializers. * - * Serialization schema of the serializer: the data format used by the serializer. + * Serialization schema of the serializer: the binary format used by the serializer, or + * in other words, the schema of data written by the serializer. * * * NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to * deserialize the configuration snapshot from its binary form. + * + * @param The data type that the originating serializer of this configuration serializes. */ @PublicEvolving -public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { +public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; + /** +* The originating serializer of this configuration snapshot. +* +* TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses, +* TODO we currently have a base implementation for the {@link #restoreSerializer()} +* TODO method which simply returns this serializer instance. The serializer is written +* TODO and read using Java serialization as part of reading / writing the config snapshot +*/ + private TypeSerializer serializer; + + /** +* Creates a serializer using this configuration, that is capable of reading data +* written by the serializer described by this configuration. +* +* @return the restored serializer. +
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220726336 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotSerializationUtil.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods for serialization of {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotSerializationUtil { + + /** +* Writes a {@link TypeSerializerConfigSnapshot} to the provided data output view. +* +* It is written with a format that can be later read again using +* {@link #readSerializerConfigSnapshot(DataInputView, ClassLoader)}. +* +* @param out the data output view +* @param serializerConfigSnapshot the serializer configuration snapshot to write +* +* @throws IOException +*/ + public static void writeSerializerConfigSnapshot( + DataOutputView out, + TypeSerializerConfigSnapshot serializerConfigSnapshot, + TypeSerializer serializer) throws IOException { + + new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out); + } + + /** +* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshot +* +* @throws IOException +*/ + public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + final TypeSerializerConfigSnapshotSerializationProxy proxy = new TypeSerializerConfigSnapshotSerializationProxy(userCodeClassLoader); + proxy.read(in); + + return proxy.getSerializerConfigSnapshot(); + } + + /** +* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously +* written using {@link TypeSerializerConfigSnapshotSerializationUtil#writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}. +* +* @param in the data input view +* @param userCodeClassLoader the user code class loader to use +* +* @return the read serializer configuration snapshots +* +* @throws IOException +*/ + public static List> readSerializerConfigSnapshots( + DataInputView in, + ClassLoader userCodeClassLoader) throws IOException { + + int numFields = in.readInt(); + final List> serializerConfigSnapshots = new ArrayList<>(numFields); + + TypeSerializerConfigSnapshotSerializationProxy proxy; Review comment: Minor comment: Looks like that variable could be defined inside the loop. Good rule of thumb is that variables should have the minimal scope by default . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints
StephanEwen commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220739021 ## File path: flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java ## @@ -77,6 +77,11 @@ private void resolveVersionRead(int readVersion) throws VersionMismatchException } } + // TODO this is a temporary workaround for FLINK-9377 that should be removed + if (readVersion == getVersion() + 1) { Review comment: This seems fragile here, because it is in a common base class outside the serializer utils. Can this be solved by overriding "getCompatibleVersions()" instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10228) Add metrics for netty direct memory consumption
[ https://issues.apache.org/jira/browse/FLINK-10228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-10228: --- Description: netty direct memory usage can be exposed via metrics so that operator can keep track of memory consumption . (was: netty direct memory usage can be exposed via metrics so that operator can keep track of memory consumption.) > Add metrics for netty direct memory consumption > --- > > Key: FLINK-10228 > URL: https://issues.apache.org/jira/browse/FLINK-10228 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > netty direct memory usage can be exposed via metrics so that operator can > keep track of memory consumption . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10444) Make S3 entropy injection work with FileSystem safety net
[ https://issues.apache.org/jira/browse/FLINK-10444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-10444. > Make S3 entropy injection work with FileSystem safety net > - > > Key: FLINK-10444 > URL: https://issues.apache.org/jira/browse/FLINK-10444 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.7.0, 1.6.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > The FileSystem Safety net wraps FileSystems. > The EntropyInjector needs to be aware of that to recognize filesystems as > entropy injecting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10444) Make S3 entropy injection work with FileSystem safety net
[ https://issues.apache.org/jira/browse/FLINK-10444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-10444. -- Resolution: Fixed Fixed in - 1.6.2 via ec73c60aba5d564d8d946d82a4d7d0911e70dc9f - 1.7.0 via cc9b3769084634dc660f7a90aed0090cb46b3342 > Make S3 entropy injection work with FileSystem safety net > - > > Key: FLINK-10444 > URL: https://issues.apache.org/jira/browse/FLINK-10444 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.7.0, 1.6.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > The FileSystem Safety net wraps FileSystems. > The EntropyInjector needs to be aware of that to recognize filesystems as > entropy injecting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629239#comment-16629239 ] ASF GitHub Bot commented on FLINK-9455: --- GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` would also be immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager (by accidentally modifying the returned collection). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` would also be immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager (by accidentally modifying the returned collection). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629238#comment-16629238 ] ASF GitHub Bot commented on FLINK-9455: --- GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` would also be immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9455) Make SlotManager aware of multi slot TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629235#comment-16629235 ] ASF GitHub Bot commented on FLINK-9455: --- GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` is also immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make SlotManager aware of multi slot TaskManagers > - > > Key: FLINK-9455 > URL: https://issues.apache.org/jira/browse/FLINK-9455 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{SlotManager}} responsible for managing all available slots of a Flink > cluster can request to start new {{TaskManagers}} if it cannot fulfill a slot > request. The started {{TaskManager}} can be started with multiple slots > configured but currently, the {{SlotManager}} thinks that it will be started > with a single slot. As a consequence, it might issue multiple requests to > start new TaskManagers even though a single one would be sufficient to > fulfill all pending slot requests. > In order to avoid requesting unnecessary resources which are freed after the > idle timeout, I suggest to make the {{SlotManager}} aware of how many slots a > {{TaskManager}} is started with. That way the SlotManager only needs to > request a new {{TaskManager}} if all of the previously started slots > (potentially not yet registered and, thus, future slots) are being assigned > to slot requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` would also be immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors
GJL commented on a change in pull request #6734: [FLINK-9455][RM] Add support for multi task slot TaskExecutors URL: https://github.com/apache/flink/pull/6734#discussion_r220674263 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -1176,8 +1177,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { // Resource Management // - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // + // Helper methods + // + + protected static Collection createSlotsPerWorker(int numSlots) { + final Collection slots = new ArrayList<>(numSlots); Review comment: It would be enough to write: `Collections.nCopies(numSlots, ResourceProfile.ANY);` The returned `List` is also immutable. At the moment there is no need to return a mutable list. I would even say it is not right that after calling `Collection startNewWorker(ResourceProfile resourceProfile)`, one may potentially modify the state of the ResourceManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10426) Port TaskTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629199#comment-16629199 ] tison commented on FLINK-10426: --- This sub tasks should consider to test {{Task}} fails if blobs missing. > Port TaskTest to new code base > -- > > Key: FLINK-10426 > URL: https://issues.apache.org/jira/browse/FLINK-10426 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{TaskTest}} to new code base -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629197#comment-16629197 ] ASF GitHub Bot commented on FLINK-10427: TisonKun opened a new pull request #6768: [FLINK-10427] [tests] Port JobSubmitTest to new code base URL: https://github.com/apache/flink/pull/6768 ## What is the purpose of the change Port `JobSubmitTest` to new code base ## Brief change log `JobSubmitTest` has three tests, here is their diagnosis: `testFailureWhenJarBlobsMissing` should be ported to `TaskTest#...`, FLIP-6 loads library on TM, not JM. To clarify, it is not applicable for FLIP-6 JM fail job because of failing to load library. So the porting job is active only when porting `TaskTest`. (Because the TaskTest is legacy test, too. we are unable to JUST add a test case onto that for now). `testFailureWhenInitializeOnMasterFails` is covered by `JobSubmissionFailsITCase#testExceptionInInitializeOnMaster` `testAnswerFailureWhenSavepointReadFails` is covered by `SavepointITCase#testSubmitWithUnknownSavepointPath` ## Verifying this change All about removal of tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port JobSubmitTest to new code base > --- > > Key: FLINK-10427 > URL: https://issues.apache.org/jira/browse/FLINK-10427 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10427) Port JobSubmitTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10427: --- Labels: pull-request-available (was: ) > Port JobSubmitTest to new code base > --- > > Key: FLINK-10427 > URL: https://issues.apache.org/jira/browse/FLINK-10427 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6768: [FLINK-10427] [tests] Port JobSubmitTest to new code base
TisonKun opened a new pull request #6768: [FLINK-10427] [tests] Port JobSubmitTest to new code base URL: https://github.com/apache/flink/pull/6768 ## What is the purpose of the change Port `JobSubmitTest` to new code base ## Brief change log `JobSubmitTest` has three tests, here is their diagnosis: `testFailureWhenJarBlobsMissing` should be ported to `TaskTest#...`, FLIP-6 loads library on TM, not JM. To clarify, it is not applicable for FLIP-6 JM fail job because of failing to load library. So the porting job is active only when porting `TaskTest`. (Because the TaskTest is legacy test, too. we are unable to JUST add a test case onto that for now). `testFailureWhenInitializeOnMasterFails` is covered by `JobSubmissionFailsITCase#testExceptionInInitializeOnMaster` `testAnswerFailureWhenSavepointReadFails` is covered by `SavepointITCase#testSubmitWithUnknownSavepointPath` ## Verifying this change All about removal of tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10427) Port JobSubmitTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629002#comment-16629002 ] tison edited comment on FLINK-10427 at 9/26/18 5:47 PM: {{testFailureWhenJarBlobsMissing}} should be ported to {{TaskTest#...}}, FLIP-6 loads library on TM, not JM. To clarify, it is not applicable for FLIP-6 JM fail job because of failing to load library. So the porting job is active only when porting {{TaskTest}}. (Because the {{TaskTest}} is legacy test, too. we are unable to JUST add a test case onto that for now). was (Author: tison): {{testFailureWhenJarBlobsMissing}} should be ported to {{TaskTest#...}}, FLIP-6 loads library on TM, not JM. > Port JobSubmitTest to new code base > --- > > Key: FLINK-10427 > URL: https://issues.apache.org/jira/browse/FLINK-10427 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10406: --- Labels: pull-request-available (was: ) > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629181#comment-16629181 ] ASF GitHub Bot commented on FLINK-10406: TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to new code base URL: https://github.com/apache/flink/pull/6765#issuecomment-424804290 FYI, travis fails on `org.apache.flink.queryablestate.network.ClientTest#testSimpleRequests`, which I cannot reproduce locally. And since this pull request just modify test scope, without any changes on production files, it should not be the reason cause the failure. Ask for a re-trigger or dig out, since it said it is discouraged to send a commit for just re-trigger. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port JobManagerTest to new code base > > > Key: FLINK-10406 > URL: https://issues.apache.org/jira/browse/FLINK-10406 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobManagerTest}} to new code base > Not all of its tests should be ported, since some of them are covered by > {{JobMasterTest}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to new code base
TisonKun commented on issue #6765: [FLINK-10406] [tests] Port JobManagerTest to new code base URL: https://github.com/apache/flink/pull/6765#issuecomment-424804290 FYI, travis fails on `org.apache.flink.queryablestate.network.ClientTest#testSimpleRequests`, which I cannot reproduce locally. And since this pull request just modify test scope, without any changes on production files, it should not be the reason cause the failure. Ask for a re-trigger or dig out, since it said it is discouraged to send a commit for just re-trigger. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629155#comment-16629155 ] Seth Wiesman commented on FLINK-10423: -- [~srichter] I have a question about the best way to handle resource management of the RocksDB native handler.{color}{color} The basic implementation uses a TimerTask to periodically pull metrics from RocksDB, this works fine until it is time to release the resource. The question I have is how to be a good citizen with the ResourceGuard. Manually calling cancel on the TimerTask before calling the blocking cancel on ResourceGuard seems brittle in the face of refactoring and it does not seem there is any way to access a CancellableRegistry from within RocksDBStateBackend. The simplest solution I've been able to come up with is to add a new method to ResourceGuard that effectively acts as a way to aquire a weak reference. {code:java} public boolean runIfResourceAvailable(Runnable function) { synchronized (lock) { if (!closed) { function.run(); } return closed; } } {code} Run atomically only if the resource is available and return if the function was run or not. When this method returns false the calling class knows the resource is being or has been closed and it is time to shut down. The concern I have here is the potential of putting to much work inside of the synchronized block. I'm curious what you think is the best path forward. > Forward RocksDB memory metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629130#comment-16629130 ] tison commented on FLINK-10427: --- {{testAnswerFailureWhenSavepointReadFails}} is covered by {{SavepointITCase#testSubmitWithUnknownSavepointPath}} > Port JobSubmitTest to new code base > --- > > Key: FLINK-10427 > URL: https://issues.apache.org/jira/browse/FLINK-10427 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629119#comment-16629119 ] ASF GitHub Bot commented on FLINK-10415: zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642450 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { + throw new IOException("Netty pipeline was not properly initialized."); + } else { + httpRequest.writeTo(channel); + future = handler.getJsonFuture(); + success = true; + } } catch (IOException e) { Review comment: We should catch all exceptions here, then this issue wouldn't have occurred in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642450 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { + throw new IOException("Netty pipeline was not properly initialized."); + } else { + httpRequest.writeTo(channel); + future = handler.getJsonFuture(); + success = true; + } } catch (IOException e) { Review comment: We should catch all exceptions here, then this issue wouldn't have occurred in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642544 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { Review comment: Do we know when/why this happens? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629120#comment-16629120 ] ASF GitHub Bot commented on FLINK-10415: zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { + throw new IOException("Netty pipeline was not properly initialized."); + } else { + httpRequest.writeTo(channel); + future = handler.getJsonFuture(); + success = true; + } } catch (IOException e) { - return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e)); + future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e)); + } finally { + if (!success) { + channel.close(); Review comment: why aren't we closing the channel in both cases? Not being initialized _properly_ doesn't necessarily mean that there's nothing to clean up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629121#comment-16629121 ] ASF GitHub Bot commented on FLINK-10415: zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642544 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { Review comment: Do we know when/why this happens? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220642737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ## @@ -339,12 +338,26 @@ private static Request createRequest(String targetAddress, String targetUrl, Htt .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture future = handler.getJsonFuture(); + + CompletableFuture future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { + throw new IOException("Netty pipeline was not properly initialized."); + } else { + httpRequest.writeTo(channel); + future = handler.getJsonFuture(); + success = true; + } } catch (IOException e) { - return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e)); + future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e)); + } finally { + if (!success) { + channel.close(); Review comment: why aren't we closing the channel in both cases? Not being initialized _properly_ doesn't necessarily mean that there's nothing to clean up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10427) Port JobSubmitTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629111#comment-16629111 ] tison commented on FLINK-10427: --- {{testFailureWhenInitializeOnMasterFails}} is covered by {{JobSubmissionFailsITCase#testExceptionInInitializeOnMaster}} > Port JobSubmitTest to new code base > --- > > Key: FLINK-10427 > URL: https://issues.apache.org/jira/browse/FLINK-10427 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Fix For: 1.7.0 > > > Port {{JobSubmitTest}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream
[ https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629105#comment-16629105 ] ASF GitHub Bot commented on FLINK-10380: StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method URL: https://github.com/apache/flink/pull/6724#issuecomment-424788457 Concerning the other method you mentioned: That one is never used on the critical path, so things may be different there. Catching the null value earlier may be a more clean fix even. But again, we should not introduce branches if avoidable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Check if key is not nul before assign to group in KeyedStream > - > > Key: FLINK-10380 > URL: https://issues.apache.org/jira/browse/FLINK-10380 > Project: Flink > Issue Type: Task >Affects Versions: 1.6.0 >Reporter: Sayat Satybaldiyev >Priority: Minor > Labels: pull-request-available > > If a user creates a KeyedStream and partition by key which might be null, > Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink > throws hard to debug and understand as it doesn't refer to place in Flink job. > *Suggestion:* > Add precondition that checks if the key is not null and throw a descriptive > error if it's a null. > > *Job Example*: > > {code:java} > DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb")) > .map(x -> (String)null) > .keyBy(x -> x);{code} > > > An error that is thrown: > > {code:java} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.lang.RuntimeException > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61) > Caused by: java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110 > INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC > service. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59) > at > org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > {code} > ... 10 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method
StephanEwen commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method URL: https://github.com/apache/flink/pull/6724#issuecomment-424788457 Concerning the other method you mentioned: That one is never used on the critical path, so things may be different there. Catching the null value earlier may be a more clean fix even. But again, we should not introduce branches if avoidable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10415) RestClient does not react to lost connection
[ https://issues.apache.org/jira/browse/FLINK-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629096#comment-16629096 ] ASF GitHub Bot commented on FLINK-10415: zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220638812 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java ## @@ -89,7 +91,78 @@ public void testInvalidVersionRejection() throws Exception { } catch (IllegalArgumentException e) { // expected } + } + /** +* Tests that we fail the operation if the remote connection closes. +*/ + @Test + public void testConnectionClosedHandling() throws Exception { + final ServerSocket serverSocket = new ServerSocket(0); + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + final CompletableFuture responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + // establish connection + final Socket connectionSocket = serverSocket.accept(); + + // close connection + connectionSocket.close(); + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.findThrowable(ee, ConnectionClosedException.class).isPresent(), is(true)); + } + } finally { + serverSocket.close(); + } + } + + /** +* Tests that we fail the operation if the client closes. +*/ + @Test + public void testRestClientClosedHandling() throws Exception { + final ServerSocket serverSocket = new ServerSocket(0); + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + Socket connectionSocket = null; + + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + final CompletableFuture responseFuture = restClient.sendRequest( Review comment: but it does fail with a different exception, no? although from a user-perspective the same issue happened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RestClient does not react to lost connection > > > Key: FLINK-10415 > URL: https://issues.apache.org/jira/browse/FLINK-10415 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1, 1.7.0, 1.5.4 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not > seem to react to a lost connections in time. When sending a request to the > current leader it happened that the leader was killed just after establishing > the connection. Then the {{RestClient}} did not fail the connection and was > stuck in writing a request or retrieving a response from the lost leader. I'm > wondering whether we should introduce a {{ReadTimeoutHandler}} and > {{WriteTimeoutHandler}} to handle these problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient
zentol commented on a change in pull request #6763: [FLINK-10415] Fail response future if connection closes in RestClient URL: https://github.com/apache/flink/pull/6763#discussion_r220638812 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java ## @@ -89,7 +91,78 @@ public void testInvalidVersionRejection() throws Exception { } catch (IllegalArgumentException e) { // expected } + } + /** +* Tests that we fail the operation if the remote connection closes. +*/ + @Test + public void testConnectionClosedHandling() throws Exception { + final ServerSocket serverSocket = new ServerSocket(0); + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + final CompletableFuture responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + // establish connection + final Socket connectionSocket = serverSocket.accept(); + + // close connection + connectionSocket.close(); + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.findThrowable(ee, ConnectionClosedException.class).isPresent(), is(true)); + } + } finally { + serverSocket.close(); + } + } + + /** +* Tests that we fail the operation if the client closes. +*/ + @Test + public void testRestClientClosedHandling() throws Exception { + final ServerSocket serverSocket = new ServerSocket(0); + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + Socket connectionSocket = null; + + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + final CompletableFuture responseFuture = restClient.sendRequest( Review comment: but it does fail with a different exception, no? although from a user-perspective the same issue happened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629064#comment-16629064 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220634306 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. + +Such semantic has an important limitation: +it requires to keep both sides of the join input on the state indefinitely and resource usage will grow indefinitely as well. + +Example: +{% highlight sql %} +SELECT * FROM Orders +INNER JOIN Product +ON Orders.productId = Product.id +{% endhighlight %} + +Time-windowed Joins +--- + +In this case we are restricting scope of the join to some time window. +This allows Flink to remove old values from the state (using [watermarks](time_attributes.html) without affecting the correctness of the result. + +Example: +{% highlight sql %} +SELECT * +FROM + Orders o, + Shipments s +WHERE o.id = s.orderId AND + o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime +{% endhighlight %} + +Temporal Table Joins + + +Temporal Table Joins allow to join a stream (left/probe side) with a table (right/build side) that changes over time. +For each record from the probe side, it will be joined only with the latest version of the build side. +That means (in contrast to [Regular Joins](#regular-joins)) if there is a new record on the build side, +it will not affect the previous past results of the join. Review comment: "the previous past results" -> "the previous results" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629052#comment-16629052 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220613633 ## File path: docs/dev/table/tableApi.md ## @@ -729,7 +729,7 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) Note: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion. -A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (, =, =, ) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables. +A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (, =, =, ) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables. Review comment: change link to `streaming/time_attributes.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629034#comment-16629034 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220598707 ## File path: docs/dev/table/sql.md ## @@ -316,7 +316,7 @@ GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user Streaming -Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute +Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute Review comment: Link should be `streaming/time_attributes.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629035#comment-16629035 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220636637 ## File path: docs/dev/table/tableApi.md ## @@ -669,6 +669,30 @@ Table result = orders {% endhighlight %} + + +Temporal Join with Temporal Table +Streaming + + +Temporal Tables are represented as Table Functions. Review comment: See comments on sql.md This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629028#comment-16629028 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220613399 ## File path: docs/dev/table/tableApi.md ## @@ -608,7 +608,7 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); Note: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion. -A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (, =, =, ) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables. +A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (, =, =, ) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables. Review comment: change link to `streaming/time_attributes.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629044#comment-16629044 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220604070 ## File path: docs/dev/table/sql.md ## @@ -769,15 +769,15 @@ The start and end timestamps of group windows as well as time attributes can be SESSION_ROWTIME(time_attr, interval) Returns the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window. - The resulting attribute is a rowtime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. + The resulting attribute is a rowtime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) - Returns a proctime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. + Returns a proctime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. Review comment: link should be `streaming/time_attributes.html#processing-time` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629091#comment-16629091 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220635686 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. + +Such semantic has an important limitation: +it requires to keep both sides of the join input on the state indefinitely and resource usage will grow indefinitely as well. + +Example: +{% highlight sql %} +SELECT * FROM Orders +INNER JOIN Product +ON Orders.productId = Product.id +{% endhighlight %} + +Time-windowed Joins +--- + +In this case we are restricting scope of the join to some time window. +This allows Flink to remove old values from the state (using [watermarks](time_attributes.html) without affecting the correctness of the result. + +Example: +{% highlight sql %} +SELECT * +FROM + Orders o, + Shipments s +WHERE o.id = s.orderId AND + o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime +{% endhighlight %} + +Temporal Table Joins + + +Temporal Table Joins allow to join a stream (left/probe side) with a table (right/build side) that changes over time. +For each record from the probe side, it will be joined only with the latest version of the build side. +That means (in contrast to [Regular Joins](#regular-joins)) if there is a new record on the build side, +it will not affect the previous past results of the join. +This again allow Flink to limit the number of elements that must be kept on the state. +In order to support updates (overwrites) of previous values on the build side table, this table must define a primary key. + +Compared to [Time-windowed Joins](#time-windowed-joins), +Temporal Table Joins are not defining a time window within which bounds the records will be joined. +Records from the probe side are joined with the most recent versions of the build side and records on the build side might be arbitrary old. Review comment: We need to rephrase "most recent version" once we support event time. Maybe it makes sense to explain this a bit more general. This would also help to understand the syntax better (why do we need to pass a proc time attribute into the temporal table function?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629030#comment-16629030 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220598373 ## File path: docs/dev/table/sql.md ## @@ -343,7 +343,7 @@ WINDOW w AS ( {% highlight sql %} SELECT DISTINCT users FROM Orders {% endhighlight %} - Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: Link should be `streaming/query_configuration.html` and change "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629061#comment-16629061 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220632174 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. + +Such semantic has an important limitation: +it requires to keep both sides of the join input on the state indefinitely and resource usage will grow indefinitely as well. + +Example: +{% highlight sql %} +SELECT * FROM Orders +INNER JOIN Product +ON Orders.productId = Product.id +{% endhighlight %} + +Time-windowed Joins +--- + +In this case we are restricting scope of the join to some time window. +This allows Flink to remove old values from the state (using [watermarks](time_attributes.html) without affecting the correctness of the result. + +Example: +{% highlight sql %} +SELECT * +FROM + Orders o, + Shipments s +WHERE o.id = s.orderId AND + o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime +{% endhighlight %} + +Temporal Table Joins + + +Temporal Table Joins allow to join a stream (left/probe side) with a table (right/build side) that changes over time. Review comment: "A Temporal Table Join joins an append-only table (left input) with a Temporal Table(link) (right input), i.e., a table that changes over time and tracks its changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629065#comment-16629065 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220618395 ## File path: docs/dev/table/sql.md ## @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE Note: Currently, only literal TRUE is supported as predicate for a left outer join against a lateral table. + + +Temporal Join with Temporal Table Review comment: "Temporal Join with Temporal Table" sounds a bit repetitive. How about "Join with Temporal Table"? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629027#comment-16629027 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220599532 ## File path: docs/dev/table/sql.md ## @@ -287,7 +287,7 @@ SELECT PRETTY_PRINT(user) FROM Orders Result Updating -Note: GroupBy on a streaming table produces an updating result. See the Streaming Concepts page for details. +Note: GroupBy on a streaming table produces an updating result. See the Streaming Concepts page for details. Review comment: change link to `streaming/dynamic_tables.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629088#comment-16629088 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220630952 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. + +Such semantic has an important limitation: +it requires to keep both sides of the join input on the state indefinitely and resource usage will grow indefinitely as well. + +Example: +{% highlight sql %} +SELECT * FROM Orders +INNER JOIN Product +ON Orders.productId = Product.id +{% endhighlight %} + +Time-windowed Joins +--- + +In this case we are restricting scope of the join to some time window. Review comment: "A time-windowed join is defined by a join predicate that checks if the time attributes(link) of the input records are within a time-window. Since time attributes are quasi-monontic increasing, Flink can remove ..." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629059#comment-16629059 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220603395 ## File path: docs/dev/table/sql.md ## @@ -760,7 +760,7 @@ The start and end timestamps of group windows as well as time attributes can be SESSION_END(time_attr, interval) Returns the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window. -Note: The exclusive upper bound timestamp cannot be used as a rowtime attribute in subsequent time-based operations, such as time-windowed joins and group window or over window aggregations. +Note: The exclusive upper bound timestamp cannot be used as a rowtime attribute in subsequent time-based operations, such as time-windowed joins and group window or over window aggregations. Review comment: link should be `streaming/time_attributes.html`, change "rowtime attribute" to "time attribute" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629072#comment-16629072 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220625871 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). Review comment: `the Joins sections` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629080#comment-16629080 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220613752 ## File path: docs/dev/table/tableApi.md ## @@ -1035,7 +1035,7 @@ val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a) val result = left.select('a, 'b, 'c).where('a.in(right)) {% endhighlight %} -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629038#comment-16629038 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220600142 ## File path: docs/dev/table/functions.md ## @@ -459,7 +459,7 @@ ANY.in(TABLE) Returns TRUE if ANY is equal to a row returned by sub-query TABLE. -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629053#comment-16629053 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220600253 ## File path: docs/dev/table/functions.md ## @@ -639,7 +639,7 @@ ANY.in(TABLE) Returns TRUE if ANY is equal to a row returned by sub-query TABLE. -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629083#comment-16629083 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220614156 ## File path: docs/dev/table/tableApi.md ## @@ -1430,7 +1430,7 @@ A session window is defined by using the `Session` class as follows: on - The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a declared event-time or processing-time time attribute. + The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a declared event-time or processing-time time attribute. Review comment: change link to `streaming/time_attributes.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629069#comment-16629069 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220612780 ## File path: docs/dev/table/tableApi.md ## @@ -420,7 +420,7 @@ orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctRes Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); {% endhighlight %} -Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629049#comment-16629049 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220602885 ## File path: docs/dev/table/sql.md ## @@ -728,7 +728,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que Time Attributes -For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming.html#time-attributes) to learn how to define time attributes. +For SQL queries on streaming tables, the `time_attr` argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the [documentation of time attributes](streaming#time_attributes.html) to learn how to define time attributes. Review comment: link should be `streaming/time_attributes.html` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629037#comment-16629037 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220602421 ## File path: docs/dev/table/sql.md ## @@ -591,7 +591,7 @@ WHERE product IN ( SELECT product FROM NewProducts ) {% endhighlight %} -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: Link should be `streaming/query_configuration.html` and change "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629071#comment-16629071 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220629382 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. Review comment: "For example, if there is a new ..." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629031#comment-16629031 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220621269 ## File path: docs/dev/table/sql.md ## @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE Note: Currently, only literal TRUE is supported as predicate for a left outer join against a lateral table. + + +Temporal Join with Temporal Table +Streaming + + +Temporal Tables are represented as Table Functions. Review comment: Extend the description? > Temporal Tables(link) are tables that track their changes over time. A Temporal Table Function(link) provides access to the state of a Temporal Table at a specific point in time. The syntax to join a table with a Temporal Table Function is the same as in Join with Table Functions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629082#comment-16629082 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220618900 ## File path: docs/dev/table/sql.md ## @@ -502,6 +502,30 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE Note: Currently, only literal TRUE is supported as predicate for a left outer join against a lateral table. + + +Temporal Join with Temporal Table Review comment: I think we can remove the link in the heading cell. There's another link in the description cell. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629078#comment-16629078 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220613508 ## File path: docs/dev/table/tableApi.md ## @@ -718,7 +718,7 @@ val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e) val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e) val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e) {% endhighlight %} -Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629032#comment-16629032 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220613700 ## File path: docs/dev/table/tableApi.md ## @@ -915,7 +915,7 @@ tableEnv.registerTable("RightTable", right); Table result = left.select("a, b, c").where("a.in(RightTable)"); {% endhighlight %} -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629086#comment-16629086 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220633567 ## File path: docs/dev/table/streaming/joins.md ## @@ -0,0 +1,93 @@ +--- +title: "Joins" +nav-parent_id: streaming_tableapi +nav-pos: 3 +--- + + +In batch processing joins are relatively easy, since we are working on a bounded completed data sets. +In stream processing things are a little bit more complicated, especially when it comes to the issue how to handle that data can change over time. +Because of that, there are couple of ways to actually perform the join using either Table API or SQL. + +For more information regarding syntax please check Joins sections in [Table API](../tableApi.html#joins) and [SQL](../sql.html#joins). + +* This will be replaced by the TOC +{:toc} + +Regular Joins +- + +This is the most basic case in which any new records or changes to either side of the join input are visible and are affecting whole join result. +If there is a new record on the left side, it will be joined with all of the previous and future records on the other side. + +Such semantic has an important limitation: +it requires to keep both sides of the join input on the state indefinitely and resource usage will grow indefinitely as well. + +Example: +{% highlight sql %} +SELECT * FROM Orders +INNER JOIN Product +ON Orders.productId = Product.id +{% endhighlight %} + +Time-windowed Joins +--- + +In this case we are restricting scope of the join to some time window. +This allows Flink to remove old values from the state (using [watermarks](time_attributes.html) without affecting the correctness of the result. + +Example: +{% highlight sql %} +SELECT * +FROM + Orders o, + Shipments s +WHERE o.id = s.orderId AND + o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime +{% endhighlight %} + +Temporal Table Joins + + +Temporal Table Joins allow to join a stream (left/probe side) with a table (right/build side) that changes over time. +For each record from the probe side, it will be joined only with the latest version of the build side. +That means (in contrast to [Regular Joins](#regular-joins)) if there is a new record on the build side, +it will not affect the previous past results of the join. +This again allow Flink to limit the number of elements that must be kept on the state. +In order to support updates (overwrites) of previous values on the build side table, this table must define a primary key. + +Compared to [Time-windowed Joins](#time-windowed-joins), +Temporal Table Joins are not defining a time window within which bounds the records will be joined. +Records from the probe side are joined with the most recent versions of the build side and records on the build side might be arbitrary old. +As time passes the previous, no longer needed, versions of the record (for the given primary key) will be removed from the state. + +Example: +{% highlight sql %} +SELECT + o.amount * r.rate AS amount +FROM + Orders AS o, + LATERAL TABLE (Rates(o.rowtime)) AS r Review comment: `o.rowtime` -> `o.proctime` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629066#comment-16629066 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220609016 ## File path: docs/dev/table/streaming/index.md ## @@ -0,0 +1,73 @@ +--- +title: "Streaming Concepts" +nav-id: streaming_tableapi +nav-parent_id: tableapi +nav-pos: 10 +is_beta: false +nav-show_overview: true +--- + + +Flink's [Table API](../tableApi.html) and [SQL support](../sql.html) are unified APIs for batch and stream processing. This means that Table API and SQL queries have the same semantics regardless whether their input is bounded batch input or unbounded stream input. Because the relational algebra and SQL were originally designed for batch processing, relational queries on unbounded streaming input are not as well understood as relational queries on bounded batch input. + +In this section, we explain concepts, practical limitations, and stream-specific configuration parameters of Flink's relational APIs on streaming data. + +Relational Queries on Data Streams +-- + +SQL and the relational algebra have not been designed with streaming data in mind. As a consequence, there are few conceptual gaps between relational algebra (and SQL) and stream processing. + + + + Relational Algebra / SQL + Stream Processing + + + Relations (or tables) are bounded (multi-)sets of tuples. + A stream is an infinite sequences of tuples. + + + A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data. + A streaming query cannot access all data when is started and has to "wait" for data to be streamed in. + + + A batch query terminates after it produced a fixed sized result. + A streaming query continuously updates its result based on the received records and never completes. + + + +Despite these differences, processing streams with relational queries and SQL is not impossible. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes outdated when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update materialized views and updates a materialized view as soon as its base tables are updated. + +The connection between eager view maintenance and SQL queries on streams becomes obvious if we consider the following: + +- A database table is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*. +- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view's base relations. +- The materialized view is the result of the streaming SQL query. + +With these points in mind, we introduce following concepts: + +* [Dynamic Tables Continuous Queries]({{ site.baseurl }}/dev/table/streaming/dynamic_tables.html) +* [Time attributes]({{ site.baseurl }}/dev/table/streaming/time_attributes.html) +* [Joins]({{ site.baseurl }}/dev/table/streaming/joins.html) Review comment: Change "Joins" to "Joins in Continuous Queries" to emphasize that this is not about joins in general? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629046#comment-16629046 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220601206 ## File path: docs/dev/table/functions.md ## @@ -297,7 +297,7 @@ value IN (sub-query) Returns TRUE if value is equal to a row returned by sub-query. -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629050#comment-16629050 ] ASF GitHub Bot commented on FLINK-9712: --- fhueske commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r220601252 ## File path: docs/dev/table/functions.md ## @@ -285,7 +285,7 @@ EXISTS (sub-query) Returns TRUE if sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation. -Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. +Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Review comment: change link to `streaming/query_configuration.html` and "Streaming Concepts" to "Query Configuration" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support enrichment joins in Flink SQL/Table API > --- > > Key: FLINK-9712 > URL: https://issues.apache.org/jira/browse/FLINK-9712 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > As described here: > https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)