[jira] [Created] (FLINK-4420) Introduce star(*) to select all of the columns in the table
Jark Wu created FLINK-4420: -- Summary: Introduce star(*) to select all of the columns in the table Key: FLINK-4420 URL: https://issues.apache.org/jira/browse/FLINK-4420 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu When we have many columns in a table, it's easy to select all the columns if we support {{select *}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4435) Replace Guava's VisibleForTesting annotation with Flink's annotation
[ https://issues.apache.org/jira/browse/FLINK-4435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4435: -- Assignee: Jark Wu > Replace Guava's VisibleForTesting annotation with Flink's annotation > > > Key: FLINK-4435 > URL: https://issues.apache.org/jira/browse/FLINK-4435 > Project: Flink > Issue Type: Improvement >Reporter: Stephan Ewen >Assignee: Jark Wu > Fix For: 1.2.0 > > > All Guava usages of the {{VisibleForTesting}} annotation should be replaced > by Flink's {{VisibleForTesting}} annotation. > This should also add a checkstyle rule that prevents the use of Guava's > annotation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4453) Scala code example in Window documentation shows Java
[ https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4453: -- Assignee: Jark Wu > Scala code example in Window documentation shows Java > - > > Key: FLINK-4453 > URL: https://issues.apache.org/jira/browse/FLINK-4453 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu >Priority: Trivial > > The first code example in the section "WindowFunction - The Generic Case" of > the window documentation of the 1.2 SNAPSHOT > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case) > shows Java code in the Scala tab. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4469) Add support for user defined table function in Table API & SQL
Jark Wu created FLINK-4469: -- Summary: Add support for user defined table function in Table API & SQL Key: FLINK-4469 URL: https://issues.apache.org/jira/browse/FLINK-4469 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu Normal user-defined functions, such as concat(), take in a single input row and output a single output row. In contrast, table-generating functions transform a single input row to multiple output rows. It is very useful in some cases, such as look up in HBase by rowkey and return one or more rows. Adding a user defined table function should: 1. inherit from UDTF class with specific generic type T 2. define one or more evel function. NOTE: 1. the eval method must be public and non-static. 2. eval should always return java.lang.Iterable or scala.collection.Iterable with the generic type T. 3. the generic type T is the row type returned by table function. Because of Java type erasure, we can’t extract T from the Iterable. 4. eval method can be overload. Blink will choose the best match eval method to call according to parameter types and number. {code} public class Word { public String word; public Integer length; } public class SplitStringUDTF extends UDTF { public Iterable eval(String str) { if (str == null) { return new ArrayList<>(); } else { List list = new ArrayList<>(); for (String s : str.split(",")) { Word word = new Word(s, s.length()); list.add(word); } return list; } } } // in SQL tableEnv.registerFunction("split", new SplitStringUDTF()) tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") // in Java Table API tableEnv.registerFunction("split", new SplitStringUDTF()) // rename split table columns to “w” and “l” table.crossApply("split(c)", "w, l") .select("a, b, w, l") // without renaming, we will use the origin field names in the POJO/case/... table.crossApply("split(c)") .select("a, b, word, length") // in Scala Table API val split = new SplitStringUDTF() table.crossApply(split('c), 'w, 'l) .select('a, 'b, 'w, 'l) // outerApply for outer join to a UDTF table.outerApply(split('c)) .select('a, 'b, 'word, 'length) {code} Here we introduce CROSS/OUTER APPLY keywords to join table functions , which is used in SQL Server. We can discuss the API in the comment. Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something others, because we have introduced {{ScalarFunction}} for custom functions, we need to keep consistent. Although, I prefer {{UDTF}} rather than {{TableFunction}} as the former is more SQL-like and the latter maybe confused with DataStream functions. **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this and release.** See [1] for more information about UDTF design. [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4503) Remove with method from CoGroupedStream and JoinedStream, and change apply method return type
Jark Wu created FLINK-4503: -- Summary: Remove with method from CoGroupedStream and JoinedStream, and change apply method return type Key: FLINK-4503 URL: https://issues.apache.org/jira/browse/FLINK-4503 Project: Flink Issue Type: Sub-task Reporter: Jark Wu We introduced (and immediately deprecated) the with(...) method in FLINK-4271. It is a temporary workaround for setting parallelism after co-group and join operator and not breaking binary compatibility. The with(...) method only differs in the return type and calls apply(...), casting the returned value. So we need to remove the {{with(...)}} method in Flink 2.0. And change the apply method return type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15439292#comment-15439292 ] Jark Wu commented on FLINK-4510: Hi [~uce] , I would like to contribute this issue. > Always create CheckpointCoordinator > --- > > Key: FLINK-4510 > URL: https://issues.apache.org/jira/browse/FLINK-4510 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > The checkpoint coordinator is only created if a checkpointing interval is > configured. This means that no savepoints can be triggered if there is no > checkpointing interval specified. > Instead we should always create it and allow an interval of 0 for disabled > periodic checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448024#comment-15448024 ] Jark Wu commented on FLINK-4469: Agree with that. Using a collector will be more user-friendly and can optimize further. Instead of defining a {Collector} as parameter, I prefer to define a {collect(T)} protected method. So that user do not need to define the `Collector` in {eval} method's parameter list, and the parameter list can keep consistent with the calling in SQL. Maybe the UDTF signature looks like this: {code} abstract class UDTF[T] { var collector: Collector[T] = null def setCollector(collector: Collector[T]): Unit = { this.collector = collector } def collect(ele: T): Unit = { collector.collect(ele) } } {code} And we can define specific UDTF like this: {code} public class SplitStringUDTF extends UDTF { public void eval(String str) { if (str != null) { for (String s : str.split(",")) { Word word = new Word(s, s.length()); collect(word); } } } } // calling in SQL as usual tableEnv.registerFunction("split", new SplitStringUDTF()) tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") {code} > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. eval should always return java.lang.Iterable or scala.collection.Iterable > with the generic type T. > 3. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str == null) { > return new ArrayList<>(); > } else { > List list = new ArrayList<>(); > for (String s : str.split(",")) { > Word word = new Word(s, s.length()); > list.add(word); > } > return list; > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c)", "w, l") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c), 'w, 'l) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > Here we introduce CROSS/OUTER APPLY keywords to join table functions , which > is used in SQL Server. We can discuss the API in the comment. > Maybe the {{UDTF}} class should be replaced by {{TableFunction}} or something > others, because we have introduced {{ScalarFunction}} for custom functions, > we need to keep consistent. Although, I prefer {{UDTF}} rather than > {{TableFunction}} as the former is more SQL-like and the latter maybe > confused with DataStream functions. > **This issue is blocked by CALCITE-1309, so we need to wait Calcite fix this > and release.** > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448024#comment-15448024 ] Jark Wu edited comment on FLINK-4469 at 8/30/16 4:51 AM: - Agree with that. Using a collector will be more user-friendly and can optimize further. Instead of defining a {{Collector}} as parameter, I prefer to define a {{collect(T)}} protected method. So that user do not need to define the `Collector` in {{eval}} method's parameter list, and the parameter list can keep consistent with the calling in SQL. Maybe the UDTF signature looks like this: {code} abstract class UDTF[T] { var collector: Collector[T] = null def setCollector(collector: Collector[T]): Unit = { this.collector = collector } def collect(ele: T): Unit = { collector.collect(ele) } } {code} And we can define specific UDTF like this: {code} public class SplitStringUDTF extends UDTF { public void eval(String str) { if (str != null) { for (String s : str.split(",")) { Word word = new Word(s, s.length()); collect(word); } } } } // calling in SQL as usual tableEnv.registerFunction("split", new SplitStringUDTF()) tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") {code} was (Author: jark): Agree with that. Using a collector will be more user-friendly and can optimize further. Instead of defining a {Collector} as parameter, I prefer to define a {collect(T)} protected method. So that user do not need to define the `Collector` in {eval} method's parameter list, and the parameter list can keep consistent with the calling in SQL. Maybe the UDTF signature looks like this: {code} abstract class UDTF[T] { var collector: Collector[T] = null def setCollector(collector: Collector[T]): Unit = { this.collector = collector } def collect(ele: T): Unit = { collector.collect(ele) } } {code} And we can define specific UDTF like this: {code} public class SplitStringUDTF extends UDTF { public void eval(String str) { if (str != null) { for (String s : str.split(",")) { Word word = new Word(s, s.length()); collect(word); } } } } // calling in SQL as usual tableEnv.registerFunction("split", new SplitStringUDTF()) tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") {code} > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. eval should always return java.lang.Iterable or scala.collection.Iterable > with the generic type T. > 3. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str == null) { > return new ArrayList<>(); > } else { > List list = new ArrayList<>(); > for (String s : str.split(",")) { > Word word = new Word(s, s.length()); > list.add(word); > } > return list; > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable CROSS APPLY split(c) AS t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c)", "w, l") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c), 'w, 'l) > .select('a, 'b, 'w, 'l)
[jira] [Created] (FLINK-4546) Remove STREAM keyword and use batch sql parser for stream jobs
Jark Wu created FLINK-4546: -- Summary: Remove STREAM keyword and use batch sql parser for stream jobs Key: FLINK-4546 URL: https://issues.apache.org/jira/browse/FLINK-4546 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM keyword in Stream SQL. detailed discuss mailing list: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4546) Remove STREAM keyword and use batch sql parser for stream jobs
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-4546: --- Issue Type: New Feature (was: Bug) > Remove STREAM keyword and use batch sql parser for stream jobs > -- > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-4546: --- Summary: Remove STREAM keyword in Stream SQL (was: Remove STREAM keyword and use batch sql parser for stream jobs) > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15464832#comment-15464832 ] Jark Wu commented on FLINK-4579: +1 for adding rocksdb to the standard distribution lib. So we only need to change the flink-dist pom setting, right ? > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15469402#comment-15469402 ] Jark Wu commented on FLINK-4565: I think what is confusing [~chobeat] is what's "IN" 's counterpart in DataSet transformation, right ? Maybe we can use broadcast variables [1] to support this feature. In addition, [FLINK-4541] may be could implemented together. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html#broadcast-variables > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15472401#comment-15472401 ] Jark Wu commented on FLINK-4591: It seems that {{GROUP BY *}} is not allowed in SQL. Maybe we can throw a better exception to explain this when user use star in groupBy. Currently, the exception is "cannot resolve [*] given input [a, b, c]" which is not clear. > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It would be consistent if this would also work: > {{table.groupBy('*).select("*)}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4597) Improve Scalar Function section in Table API documentation
Jark Wu created FLINK-4597: -- Summary: Improve Scalar Function section in Table API documentation Key: FLINK-4597 URL: https://issues.apache.org/jira/browse/FLINK-4597 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu Priority: Minor The function signature in Scalar Function section is a little confusing. Because it's hard to distinguish keyword and parameters. Such as : {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a parameter after first glance. I propose to use {{<>}} around parameters, i.e. {{EXTRACT( FROM )}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4598) Support NULLIF in Table API
Jark Wu created FLINK-4598: -- Summary: Support NULLIF in Table API Key: FLINK-4598 URL: https://issues.apache.org/jira/browse/FLINK-4598 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu This could be a subtask of [FLINK-4549]. As Flink SQL has supported {{NULLIF}} implicitly. We should support it in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4597) Improve Scalar Function section in Table API documentation
[ https://issues.apache.org/jira/browse/FLINK-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473160#comment-15473160 ] Jark Wu commented on FLINK-4597: I have a look at Calcite documentation [1] , this syntax is concise and clear. +1 for this and I will close this issue. [1] https://calcite.apache.org/docs/reference.html#comparison-operators > Improve Scalar Function section in Table API documentation > -- > > Key: FLINK-4597 > URL: https://issues.apache.org/jira/browse/FLINK-4597 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Minor > > The function signature in Scalar Function section is a little confusing. > Because it's hard to distinguish keyword and parameters. Such as : > {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a > parameter after first glance. I propose to use {{<>}} around parameters, i.e. > {{EXTRACT( FROM )}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4597) Improve Scalar Function section in Table API documentation
[ https://issues.apache.org/jira/browse/FLINK-4597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-4597. -- Resolution: Duplicate > Improve Scalar Function section in Table API documentation > -- > > Key: FLINK-4597 > URL: https://issues.apache.org/jira/browse/FLINK-4597 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Minor > > The function signature in Scalar Function section is a little confusing. > Because it's hard to distinguish keyword and parameters. Such as : > {{EXTRACT(TIMEINTERVALUNIT FROM TEMPORAL)}}, user may not know TEMPORAL is a > parameter after first glance. I propose to use {{<>}} around parameters, i.e. > {{EXTRACT( FROM )}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4598) Support NULLIF in Table API
[ https://issues.apache.org/jira/browse/FLINK-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473200#comment-15473200 ] Jark Wu commented on FLINK-4598: You are right. We should evaluate what functions should be supported in Table API (functions user asked or used often). It seems that {{NULLIF}} is a little-know function, I can close this issue if users rarely use it. > Support NULLIF in Table API > > > Key: FLINK-4598 > URL: https://issues.apache.org/jira/browse/FLINK-4598 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > This could be a subtask of [FLINK-4549]. As Flink SQL has supported > {{NULLIF}} implicitly. We should support it in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4579: -- Assignee: Jark Wu > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Jark Wu > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4591: -- Assignee: Jark Wu > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy('*).select("*)}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476058#comment-15476058 ] Jark Wu commented on FLINK-4591: Make sense. I will look into it. > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It would be consistent if this would also work: > {{table.groupBy('*).select("*)}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476147#comment-15476147 ] Jark Wu commented on FLINK-4591: Make sense... They are equal, we should encourage to use {{.distinct()}} rather than {{.groupBy( '* ).select( '* )}}. But should we provide the other choice ? > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy( '* ).select( "* )}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476177#comment-15476177 ] Jark Wu edited comment on FLINK-4591 at 9/9/16 7:15 AM: Is {{.groupBy( '* ).select( '* , 'a.count )}} a strong case to support {{groupBy( '* )}} ? was (Author: jark): Is {{.groupBy( '* ).select( '* , 'a.count )}} a strong case to support {{groupBy( '* )}} > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy( '* ).select( "* )}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476177#comment-15476177 ] Jark Wu commented on FLINK-4591: Is {{.groupBy( '* ).select( '* , 'a.count )}} a strong case to support {{groupBy( '* )}} > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy( '* ).select( "* )}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4598) Support NULLIF in Table API
[ https://issues.apache.org/jira/browse/FLINK-4598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-4598. -- Resolution: Won't Fix > Support NULLIF in Table API > > > Key: FLINK-4598 > URL: https://issues.apache.org/jira/browse/FLINK-4598 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > This could be a subtask of [FLINK-4549]. As Flink SQL has supported > {{NULLIF}} implicitly. We should support it in Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4662) Bump Calcite version up to 1.9
[ https://issues.apache.org/jira/browse/FLINK-4662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4662: -- Assignee: Jark Wu > Bump Calcite version up to 1.9 > -- > > Key: FLINK-4662 > URL: https://issues.apache.org/jira/browse/FLINK-4662 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Calcite just released the 1.9 version. We should adopt it also in the Table > API especially for FLINK-4294. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4263) SQL's VALUES does not work properly
[ https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513401#comment-15513401 ] Jark Wu commented on FLINK-4263: It seems that the data types in {{VALUES}} are always RexLiteral which should be basic types and could be serializable ? > SQL's VALUES does not work properly > --- > > Key: FLINK-4263 > URL: https://issues.apache.org/jira/browse/FLINK-4263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: Jark Wu > > Executing the following SQL leads to very strange output: > {code} > SELECT * > FROM( > VALUES > (1, 2), > (3, 4) > ) AS q (col1, col2)" > {code} > {code} > org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at translateToPlan(DataSetValues.scala:88) > (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user code > wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281) > ... 51 more > Caused by: java.io.NotSerializableException: org.apache.flink.api.table.Row > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.jav
[jira] [Commented] (FLINK-4263) SQL's VALUES does not work properly
[ https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515209#comment-15515209 ] Jark Wu commented on FLINK-4263: I have no idea how to code generate the values input format. So feel free to assign it to you [~twalthr], or give me some tips about why code generation can solve serialization problem. I will keep watching this issue. > SQL's VALUES does not work properly > --- > > Key: FLINK-4263 > URL: https://issues.apache.org/jira/browse/FLINK-4263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: Jark Wu > > Executing the following SQL leads to very strange output: > {code} > SELECT * > FROM( > VALUES > (1, 2), > (3, 4) > ) AS q (col1, col2)" > {code} > {code} > org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at translateToPlan(DataSetValues.scala:88) > (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user code > wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281) > ... 51 more > Caused by: java.io.NotSerializableException: org.apache.flink.api.table.Row > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputSt
[jira] [Commented] (FLINK-4605) Add an expression that returns the return type of an expression
[ https://issues.apache.org/jira/browse/FLINK-4605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515702#comment-15515702 ] Jark Wu commented on FLINK-4605: Hi [~twalthr], could you explain this issue a little more detail ? What is the {{myLong.getType}} return type, a String or Expression ? > Add an expression that returns the return type of an expression > --- > > Key: FLINK-4605 > URL: https://issues.apache.org/jira/browse/FLINK-4605 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther > > Esp. for Java users of the Table API it is hard to obtain the return type of > an expression. I propose to implement an expression that returns the type of > an input expression as a string. > {{myLong.getType()}} could call the toString method of TypeInformation. > This could also be useful to distinguish between different subtypes of POJOs > etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4621) Improve decimal literals of SQL API
[ https://issues.apache.org/jira/browse/FLINK-4621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515830#comment-15515830 ] Jark Wu commented on FLINK-4621: I think the problem is how to convert BigDecimal into relative java primitives. Do you have any ideas? > Improve decimal literals of SQL API > --- > > Key: FLINK-4621 > URL: https://issues.apache.org/jira/browse/FLINK-4621 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Currently, all SQL {{DECIMAL}} types are converted to BigDecimals internally. > By default, the SQL parsers creates {{DECIMAL}} literals of any number e.g. > {{SELECT 1.0, 12, -0.5 FROM x}}. I think it would be better if these simple > numbers would be represented as Java primitives instead of objects. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-6259) Fix a small spelling error
[ https://issues.apache.org/jira/browse/FLINK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-6259. -- Resolution: Fixed Fix Version/s: 1.3.0 fixed in cae4976a4b9d4fa67f207dd08b8c9480c6f8989b > Fix a small spelling error > -- > > Key: FLINK-6259 > URL: https://issues.apache.org/jira/browse/FLINK-6259 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > flink-gelly-scala/pom.xml {{har-with-dependencies}} -> > {{jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4692) Add tumbling and sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652869#comment-15652869 ] Jark Wu commented on FLINK-4692: Yes. I agree to move the sliding window to a separate issue. And we can discuss the implementation more detail in that issue. Option 2 is a nicer way but only support combinable aggregation. Maybe we can implement approach-1 in the first version, and do improvement in the later issues. > Add tumbling and sliding group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther > > Add Tumble and Slide group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5047) Add tumbling group-windows for batch tables
Jark Wu created FLINK-5047: -- Summary: Add tumbling group-windows for batch tables Key: FLINK-5047 URL: https://issues.apache.org/jira/browse/FLINK-5047 Project: Flink Issue Type: Sub-task Reporter: Jark Wu Add Slide group-windows for batch tables as described in [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations). There are two ways to implement sliding windows for batch: 1. replicate the output in order to assign keys for overlapping windows. This is probably the more straight-forward implementation and supports any aggregation function but blows up the data volume. 2. if the aggregation functions are combinable / pre-aggregatable, we can also find the largest tumbling window size from which the sliding windows can be assembled. This is basically the technique used to express sliding windows with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 minutes) this would mean to first compute aggregates of non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of these into a sliding window (could be done in a MapPartition with sorted input). The implementation could be done as an optimizer rule to split the sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes sense to implement the WINDOW clause first and reuse this for sliding windows. see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5047) Add sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5047: --- Summary: Add sliding group-windows for batch tables (was: Add Sliding group-windows for batch tables) > Add sliding group-windows for batch tables > -- > > Key: FLINK-5047 > URL: https://issues.apache.org/jira/browse/FLINK-5047 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Jark Wu > > Add Slide group-windows for batch tables as described in > [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations). > There are two ways to implement sliding windows for batch: > 1. replicate the output in order to assign keys for overlapping windows. This > is probably the more straight-forward implementation and supports any > aggregation function but blows up the data volume. > 2. if the aggregation functions are combinable / pre-aggregatable, we can > also find the largest tumbling window size from which the sliding windows can > be assembled. This is basically the technique used to express sliding windows > with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 > minutes, 2 minutes) this would mean to first compute aggregates of > non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of > these into a sliding window (could be done in a MapPartition with sorted > input). The implementation could be done as an optimizer rule to split the > sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe > it makes sense to implement the WINDOW clause first and reuse this for > sliding windows. > see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5047) Add sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5047: --- Description: Add Slide group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. There are two ways to implement sliding windows for batch: 1. replicate the output in order to assign keys for overlapping windows. This is probably the more straight-forward implementation and supports any aggregation function but blows up the data volume. 2. if the aggregation functions are combinable / pre-aggregatable, we can also find the largest tumbling window size from which the sliding windows can be assembled. This is basically the technique used to express sliding windows with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 minutes) this would mean to first compute aggregates of non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of these into a sliding window (could be done in a MapPartition with sorted input). The implementation could be done as an optimizer rule to split the sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes sense to implement the WINDOW clause first and reuse this for sliding windows. see FLINK-4692 for more discussion was: Add Slide group-windows for batch tables as described in [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations). There are two ways to implement sliding windows for batch: 1. replicate the output in order to assign keys for overlapping windows. This is probably the more straight-forward implementation and supports any aggregation function but blows up the data volume. 2. if the aggregation functions are combinable / pre-aggregatable, we can also find the largest tumbling window size from which the sliding windows can be assembled. This is basically the technique used to express sliding windows with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 minutes) this would mean to first compute aggregates of non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of these into a sliding window (could be done in a MapPartition with sorted input). The implementation could be done as an optimizer rule to split the sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes sense to implement the WINDOW clause first and reuse this for sliding windows. see FLINK-4692 for more discussion > Add sliding group-windows for batch tables > -- > > Key: FLINK-5047 > URL: https://issues.apache.org/jira/browse/FLINK-5047 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Jark Wu > > Add Slide group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > There are two ways to implement sliding windows for batch: > 1. replicate the output in order to assign keys for overlapping windows. This > is probably the more straight-forward implementation and supports any > aggregation function but blows up the data volume. > 2. if the aggregation functions are combinable / pre-aggregatable, we can > also find the largest tumbling window size from which the sliding windows can > be assembled. This is basically the technique used to express sliding windows > with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 > minutes, 2 minutes) this would mean to first compute aggregates of > non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of > these into a sliding window (could be done in a MapPartition with sorted > input). The implementation could be done as an optimizer rule to split the > sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe > it makes sense to implement the WINDOW clause first and reuse this for > sliding windows. > see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5047) Add Sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5047: --- Summary: Add Sliding group-windows for batch tables (was: Add tumbling group-windows for batch tables) > Add Sliding group-windows for batch tables > -- > > Key: FLINK-5047 > URL: https://issues.apache.org/jira/browse/FLINK-5047 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Jark Wu > > Add Slide group-windows for batch tables as described in > [FLIP-11](https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations). > There are two ways to implement sliding windows for batch: > 1. replicate the output in order to assign keys for overlapping windows. This > is probably the more straight-forward implementation and supports any > aggregation function but blows up the data volume. > 2. if the aggregation functions are combinable / pre-aggregatable, we can > also find the largest tumbling window size from which the sliding windows can > be assembled. This is basically the technique used to express sliding windows > with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 > minutes, 2 minutes) this would mean to first compute aggregates of > non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of > these into a sliding window (could be done in a MapPartition with sorted > input). The implementation could be done as an optimizer rule to split the > sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe > it makes sense to implement the WINDOW clause first and reuse this for > sliding windows. > see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4692) Add tumbling and sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-4692: --- Description: Add Tumble group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. (was: Add Tumble and Slide group-windows for batch tables as described in [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. ) > Add tumbling and sliding group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-4692: --- Summary: Add tumbling group-windows for batch tables (was: Add tumbling and sliding group-windows for batch tables) > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652887#comment-15652887 ] Jark Wu commented on FLINK-4692: Hi guys, I moved the sliding window into FLINK-5047. And keep this issue only for tumbling window. I suggest to continue the discussion of sliding window implementation under FLINK-5047. > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653869#comment-15653869 ] Jark Wu commented on FLINK-5047: Hi [~fhueske] Agree. I think the first approach can be easily supported after FLINK-4692 resolved. Regarding to {quote} it a) is based on the implementation that supports non-combinable aggregates (which is required in any case) {quote} If I understand correctly, the third approach doesn't support non-combinable aggregates such as median, right ? It's only an optimization for pre-aggregation which is better than approach-2 , right? > Add sliding group-windows for batch tables > -- > > Key: FLINK-5047 > URL: https://issues.apache.org/jira/browse/FLINK-5047 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Jark Wu > > Add Slide group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > There are two ways to implement sliding windows for batch: > 1. replicate the output in order to assign keys for overlapping windows. This > is probably the more straight-forward implementation and supports any > aggregation function but blows up the data volume. > 2. if the aggregation functions are combinable / pre-aggregatable, we can > also find the largest tumbling window size from which the sliding windows can > be assembled. This is basically the technique used to express sliding windows > with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 > minutes, 2 minutes) this would mean to first compute aggregates of > non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of > these into a sliding window (could be done in a MapPartition with sorted > input). The implementation could be done as an optimizer rule to split the > sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe > it makes sense to implement the WINDOW clause first and reuse this for > sliding windows. > 3. There is also a third, hybrid solution: Doing the pre-aggregation on the > largest non-overlapping windows (as in 2) and replicating these results and > processing those as in the 1) approach. The benefits of this is that it a) is > based on the implementation that supports non-combinable aggregates (which is > required in any case) and b) that it does not require the implementation of > the SQL WINDOW operator. Internally, this can be implemented again as an > optimizer rule that translates the SlidingWindow into a pre-aggregating > TublingWindow and a final SlidingWindow (with replication). > see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4692: -- Assignee: Jark Wu > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653975#comment-15653975 ] Jark Wu commented on FLINK-5047: Make sense. I prefer the third approach too. > Add sliding group-windows for batch tables > -- > > Key: FLINK-5047 > URL: https://issues.apache.org/jira/browse/FLINK-5047 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Jark Wu > > Add Slide group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > There are two ways to implement sliding windows for batch: > 1. replicate the output in order to assign keys for overlapping windows. This > is probably the more straight-forward implementation and supports any > aggregation function but blows up the data volume. > 2. if the aggregation functions are combinable / pre-aggregatable, we can > also find the largest tumbling window size from which the sliding windows can > be assembled. This is basically the technique used to express sliding windows > with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 > minutes, 2 minutes) this would mean to first compute aggregates of > non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of > these into a sliding window (could be done in a MapPartition with sorted > input). The implementation could be done as an optimizer rule to split the > sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe > it makes sense to implement the WINDOW clause first and reuse this for > sliding windows. > 3. There is also a third, hybrid solution: Doing the pre-aggregation on the > largest non-overlapping windows (as in 2) and replicating these results and > processing those as in the 1) approach. The benefits of this is that it a) is > based on the implementation that supports non-combinable aggregates (which is > required in any case) and b) that it does not require the implementation of > the SQL WINDOW operator. Internally, this can be implemented again as an > optimizer rule that translates the SlidingWindow into a pre-aggregating > TublingWindow and a final SlidingWindow (with replication). > see FLINK-4692 for more discussion -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654209#comment-15654209 ] Jark Wu commented on FLINK-4565: Hi, [~nvasilishin], regarding to the second problem, I just find that we only need to add an entry of {{"in" -> classOf[In]}} into {{FunctionCatalog.builtInFunctions}}, do not need to modify {{ExpressionParser}}. It can fix the problem. The SUM, AVG, COUNT have no parameters and can omit the parenthesis , but suffixFunctionCall must have parenthesis and is varargs. So for some special functions , we need to implement their "lazy val functions". > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Nikolay Vasilishin > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15655847#comment-15655847 ] Jark Wu commented on FLINK-4565: Thank you [~fhueske] for the detailed internals. Regarding the subissue-1, could NOT IN with less than 20 literals go with this issue together ? Because they can share many common code. Regarding the subissue-2 and -3, I'm afraid they will involve cross join too. As you mentioned in this [mailing list|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-4541-Support-for-SQL-NOT-IN-operator-td14464.html], you would like to avoid introduce cross join. So would subissue-2 and -3 not be supported at this point ? > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Nikolay Vasilishin > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4679) Add TumbleRow row-windows for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676708#comment-15676708 ] Jark Wu commented on FLINK-4679: Hi [~fhueske] [~twalthr], if I understand correctly, the row-window will evaluate the aggregates every time a row comes in the window. I think it is really like window early-fire which is controlled by Trigger. Could we implement some specific Trigger to fire on every element and then no custom stream operator needed ? Have I missed anything? The Row-count row-window trigger could be like this : {code} class RowWindowCountTrigger[W <: Window](maxCount: Long) extends Trigger[Any, W] { val stateDesc = new ReducingStateDescriptor[JLong]("count", Sum, LongSerializer.INSTANCE) override def onElement(element: Any, timestamp: Long, window: W, ctx: TriggerContext) : TriggerResult = { val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc) count.add(1L) if (count.get >= maxCount) { count.clear() TriggerResult.FIRE_AND_PURGE } else { TriggerResult.FIRE } } override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = TriggerResult.CONTINUE override def clear(window: W, ctx: TriggerContext): Unit = ctx.getPartitionedState(stateDesc).clear() @SerialVersionUID(1L) object Sum extends ReduceFunction[JLong] { @throws[Exception] def reduce(value1: JLong, value2: JLong): JLong = value1 + value2 } } {code} > Add TumbleRow row-windows for streaming tables > -- > > Key: FLINK-4679 > URL: https://issues.apache.org/jira/browse/FLINK-4679 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > Add TumbleRow row-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > This task requires to implement a custom stream operator and integrate it > with checkpointing and timestamp / watermark logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5094) Support RichReduceFunction and RichFoldFunction as incremental window aggregation functions
[ https://issues.apache.org/jira/browse/FLINK-5094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676759#comment-15676759 ] Jark Wu commented on FLINK-5094: Hi [~fhueske], this may need to modify the implementation of {{FoldingState}} {{ReducingState}}, i.e. {{HeapReducingState}} {{RocksDBReducingState}}, right? > Support RichReduceFunction and RichFoldFunction as incremental window > aggregation functions > --- > > Key: FLINK-5094 > URL: https://issues.apache.org/jira/browse/FLINK-5094 > Project: Flink > Issue Type: Improvement > Components: Streaming, Windowing Operators >Affects Versions: 1.2.0, 1.1.3 >Reporter: Fabian Hueske > > Support {{RichReduceFunction}} and {{RichFoldFunction}} as incremental window > aggregation functions in order to initialize the functions via {{open()}}. > The main problem is that we do not want to provide the full power of > {{RichFunction}} for incremental aggregation functions, such as defining own > operator state. This could be achieve by providing some kind of > {{RestrictedRuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-4591. -- Resolution: Won't Fix > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > It would be consistent if this would also work: > {{table.groupBy( '* ).select( "* )}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698090#comment-15698090 ] Jark Wu commented on FLINK-4692: Hi [~fhueske] [~twalthr], I have proposed a [design doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit#] for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome! > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698090#comment-15698090 ] Jark Wu edited comment on FLINK-4692 at 11/29/16 1:49 AM: -- Hi [~fhueske] [~twalthr], I have proposed a [design doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit?usp=sharing] for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome! was (Author: jark): Hi [~fhueske] [~twalthr], I have proposed a [design doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit#] for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome! > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5223) Add documentation of UDTF in Table API & SQL
Jark Wu created FLINK-5223: -- Summary: Add documentation of UDTF in Table API & SQL Key: FLINK-5223 URL: https://issues.apache.org/jira/browse/FLINK-5223 Project: Flink Issue Type: Bug Reporter: Jark Wu -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5223) Add documentation of UDTF in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5223: --- Issue Type: Improvement (was: Bug) > Add documentation of UDTF in Table API & SQL > > > Key: FLINK-5223 > URL: https://issues.apache.org/jira/browse/FLINK-5223 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5223) Add documentation of UDTF in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5223: --- Component/s: Table API & SQL > Add documentation of UDTF in Table API & SQL > > > Key: FLINK-5223 > URL: https://issues.apache.org/jira/browse/FLINK-5223 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5224) Improve UDTF: emit rows directly instead of buffering them
[ https://issues.apache.org/jira/browse/FLINK-5224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-5224: -- Assignee: Jark Wu > Improve UDTF: emit rows directly instead of buffering them > -- > > Key: FLINK-5224 > URL: https://issues.apache.org/jira/browse/FLINK-5224 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > This needs to code generate a `Collector` and register it into instance of > {{TableFunction}}, and emit the rows generated by the UDTF directly instead > of buffering them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5224) Improve UDTF: emit rows directly instead of buffering them
Jark Wu created FLINK-5224: -- Summary: Improve UDTF: emit rows directly instead of buffering them Key: FLINK-5224 URL: https://issues.apache.org/jira/browse/FLINK-5224 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Jark Wu This needs to code generate a `Collector` and register it into instance of {{TableFunction}}, and emit the rows generated by the UDTF directly instead of buffering them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5225) Add interface to override parameter types of UDTF's eval method
[ https://issues.apache.org/jira/browse/FLINK-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-5225: --- Component/s: Table API & SQL > Add interface to override parameter types of UDTF's eval method > --- > > Key: FLINK-5225 > URL: https://issues.apache.org/jira/browse/FLINK-5225 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu > > {{ScalarFunction}} has {{getParameterTypes()}} to be overridden if the > parameter types of the eval method can not be determined automatically. This > is missing in {{TableFunction}}. > This needs to implement a custom Calcite's {{TableFunction}} and override > {{getParameters()}}. But currently, the {{FlinkTableFunctionImpl}} extends > {{ReflectiveFunctionBase}}, the {{ReflectiveFunctionBase}} determines the > parameter types of the eval method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5225) Add interface to override parameter types of UDTF's eval method
Jark Wu created FLINK-5225: -- Summary: Add interface to override parameter types of UDTF's eval method Key: FLINK-5225 URL: https://issues.apache.org/jira/browse/FLINK-5225 Project: Flink Issue Type: Improvement Reporter: Jark Wu {{ScalarFunction}} has {{getParameterTypes()}} to be overridden if the parameter types of the eval method can not be determined automatically. This is missing in {{TableFunction}}. This needs to implement a custom Calcite's {{TableFunction}} and override {{getParameters()}}. But currently, the {{FlinkTableFunctionImpl}} extends {{ReflectiveFunctionBase}}, the {{ReflectiveFunctionBase}} determines the parameter types of the eval method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5187) Create analog of Row in core
[ https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15731686#comment-15731686 ] Jark Wu commented on FLINK-5187: Hi [~tonycox], do you have been working on this issue? If not, would you mind to assign this issue to me? Actually, I have almost done. > Create analog of Row in core > > > Key: FLINK-5187 > URL: https://issues.apache.org/jira/browse/FLINK-5187 > Project: Flink > Issue Type: Sub-task > Components: Core, Table API & SQL >Reporter: Anton Solovev >Assignee: Anton Solovev > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5187) Create analog of Row in core
[ https://issues.apache.org/jira/browse/FLINK-5187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15732044#comment-15732044 ] Jark Wu commented on FLINK-5187: Sure. I will create the PR before tomorrow. Thanks. > Create analog of Row in core > > > Key: FLINK-5187 > URL: https://issues.apache.org/jira/browse/FLINK-5187 > Project: Flink > Issue Type: Sub-task > Components: Core, Table API & SQL >Reporter: Anton Solovev >Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5304) Change method name from crossApply to join in Table API
Jark Wu created FLINK-5304: -- Summary: Change method name from crossApply to join in Table API Key: FLINK-5304 URL: https://issues.apache.org/jira/browse/FLINK-5304 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Jark Wu Assignee: Jark Wu Currently, the UDTF in Table API is used with {{crossApply}}, but is used with JOIN in SQL. UDTF is something similar to Table, so it make sense to use {{.join("udtf(c) as (s)")}} in Table API too, and join is more familiar to users than {{crossApply}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5304) Change method name from crossApply to join in Table API
[ https://issues.apache.org/jira/browse/FLINK-5304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15734992#comment-15734992 ] Jark Wu commented on FLINK-5304: Yes. {{outerApply}} will be renamed to {{leftOuterJoin}} > Change method name from crossApply to join in Table API > --- > > Key: FLINK-5304 > URL: https://issues.apache.org/jira/browse/FLINK-5304 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, the UDTF in Table API is used with {{crossApply}}, but is used > with JOIN in SQL. UDTF is something similar to Table, so it make sense to use > {{.join("udtf(c) as (s)")}} in Table API too, and join is more familiar to > users than {{crossApply}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5223) Add documentation of UDTF in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-5223: -- Assignee: Jark Wu > Add documentation of UDTF in Table API & SQL > > > Key: FLINK-5223 > URL: https://issues.apache.org/jira/browse/FLINK-5223 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5225) Add interface to override parameter types of UDTF's eval method
[ https://issues.apache.org/jira/browse/FLINK-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-5225: -- Assignee: Jark Wu > Add interface to override parameter types of UDTF's eval method > --- > > Key: FLINK-5225 > URL: https://issues.apache.org/jira/browse/FLINK-5225 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > {{ScalarFunction}} has {{getParameterTypes()}} to be overridden if the > parameter types of the eval method can not be determined automatically. This > is missing in {{TableFunction}}. > This needs to implement a custom Calcite's {{TableFunction}} and override > {{getParameters()}}. But currently, the {{FlinkTableFunctionImpl}} extends > {{ReflectiveFunctionBase}}, the {{ReflectiveFunctionBase}} determines the > parameter types of the eval method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15743965#comment-15743965 ] Jark Wu commented on FLINK-5280: Hi [~ivan.mushketyk], I will try to answer your question. The main problem you confused is that {{CodeGenerator}} doesn't support nested access. Actually, it has been fixed in FLINK-4294, and you can have a look at the test example {{CompositeAccessTest}}. I think it will give you some inspiration. And the other problem that {{BatchScan#convertToExpectedType}} will convert input dataset into Row type. Actually, it will not flatten the nested fields, but keep the same schema in Row. In your case, the ParentPojo will be converted Row type with {{Row(child: ChildPojo, num:Int)}}. Hope that will help you. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750273#comment-15750273 ] Jark Wu commented on FLINK-5280: Hi [~ivan.mushketyk], thanks for your detailed and clear proposal. Regarding to the new argument {{fieldMappings}} in {{FlinkTable}}, I think it is playing the same role of {{fieldIndexes}}. Actually, {{fieldIndexes}} is the {{inputPojoFieldMapping}} in {{CodeGenerator}} when converting. In case of POJO, {{fieldIndexes}} is a fieldMapping. In other cases, it is an array of {{0~n}}. Regarding to the {{getNumberOfFields}} in {{TableSource}}, yes, it is used rarely used and can be replaced by {{getFieldsNames.length}} if {{getFieldsNames}} still display the first level attributes. Hi [~fhueske], I agree with the {{RowTypeInfo}} approach which is similar to Calcite's way I think. But we should support custom names in {{RowTypeInfo}} first. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15751325#comment-15751325 ] Jark Wu commented on FLINK-5280: Hi [~ivan.mushketyk], Yes, you are right. In your case, the POJO TableSource's {{fieldIndexes}} is not clear. But we can use {{getFieldsNames}} and {{getResultType}} to generate the {{fieldIndexes}}. So the new {{getFieldMapping}} is still duplicate with {{getFieldsNames}}, am I right? I don't know much about {{GenericRecord}}, maybe [~fhueske] can answer your question. Does {{GenericRecord}} has an immutable schema, or will change every record ? IMO, the TableSource interface can be simplified to this: {code:java} trait TableSource[T] { /** Return this table source's row type. The returned RowTypeInfo is a composite type and can have * nested types whose fields describe the names and types of the columns in this table. */ def getReturnType: RowTypeInfo } {code} The {{getReturnType}} is forced to return a RowTypeInfo. It describes the first level field names and types (maybe nested). So that we can support nested data for TableSource. But currently, the RowTypeInfo doesn't support custom field names, so we should fix that first. And the original {{getNumberOfFields}} , {{getFieldsNames}} , {{getFieldTypes}} interfaces in {{TableSource}} could be removed, as they can be derived from the returned RowTypeInfo. Finally, it will be similar to Calcite's {{Table}} interface which actually only has a {{RelDataType getRowType(RelDataTypeFactory typeFactory)}} method to implement. What do you think ? [~fhueske] [~ivan.mushketyk] > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5348) Support custom field names for RowTypeInfo
Jark Wu created FLINK-5348: -- Summary: Support custom field names for RowTypeInfo Key: FLINK-5348 URL: https://issues.apache.org/jira/browse/FLINK-5348 Project: Flink Issue Type: Improvement Components: Core Reporter: Jark Wu Assignee: Jark Wu Currently, the RowTypeInfo doesn't support optional custom field names, but forced to generate {{f0}} ~ {{fn}} as field names. It would be better to support custom names and will benefit some cases (e.g. FLINK-5280). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15753315#comment-15753315 ] Jark Wu commented on FLINK-5280: Row and RowTypeInfo has been moved to flink-core. So I would suggest to do it in a separate issue. I created FLINK-5348 to fix it. > Extend TableSource to support nested data > - > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5358) Support RowTypeInfo extraction in TypeExtractor by Row instance
[ https://issues.apache.org/jira/browse/FLINK-5358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761026#comment-15761026 ] Jark Wu commented on FLINK-5358: If one of the fields is null, what is the {{TypeInformation}} of this field? > Support RowTypeInfo extraction in TypeExtractor by Row instance > --- > > Key: FLINK-5358 > URL: https://issues.apache.org/jira/browse/FLINK-5358 > Project: Flink > Issue Type: Improvement >Reporter: Anton Solovev >Assignee: Anton Solovev > > {code} > Row[] data = new Row[]{}; > TypeInformation typeInfo = TypeExtractor.getForObject(data[0]); > {code} > method {{getForObject}} wraps it into > {code} > GenericTypeInfo > {code} > the method should return {{RowTypeInfo}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4109) Change the name of ternary condition operator 'eval' to '?'
Jark Wu created FLINK-4109: -- Summary: Change the name of ternary condition operator 'eval' to '?' Key: FLINK-4109 URL: https://issues.apache.org/jira/browse/FLINK-4109 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Jark Wu Assignee: Jark Wu The ternary condition operator in Table API is named {{eval}}, for example: {{(42 > 5).eval("A", "B")}} leads to "A". IMO, the eval function is not well understood. Instead the "?" is a better choice I think, which is used in Java for condition operator. It will be clearer and more literal understood, e.g. {{(42 > 5).?("A", "B")}} or {{(42 > 5) ? ("A", "B")}} If it make sense, I will pull a request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
Jark Wu created FLINK-4111: -- Summary: Flink Table & SQL doesn't work in very simple example Key: FLINK-4111 URL: https://issues.apache.org/jira/browse/FLINK-4111 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Jark Wu Fix For: 1.1.0 I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and SQL in my project. But when I run the very simple example WordCountTable, I encountered the following exception : {code} Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; at org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) {code} It seems that something wrong with our guava shade. Do you have any ideas? My pom file and WordCountTable.scala are [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. And I found someone have the same problem on stack overflow [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4109) Change the name of ternary condition operator 'eval' to '?'
[ https://issues.apache.org/jira/browse/FLINK-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15347564#comment-15347564 ] Jark Wu commented on FLINK-4109: [~vkalavri] [~chengxiang li] [~rmetzger] what do you think ? > Change the name of ternary condition operator 'eval' to '?' > --- > > Key: FLINK-4109 > URL: https://issues.apache.org/jira/browse/FLINK-4109 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.1.0 > > > The ternary condition operator in Table API is named {{eval}}, for example: > {{(42 > 5).eval("A", "B")}} leads to "A". IMO, the eval function is not well > understood. Instead the "?" is a better choice I think, which is used in Java > for condition operator. > It will be clearer and more literal understood, e.g. > {{(42 > 5).?("A", "B")}} or {{(42 > 5) ? ("A", "B")}} > If it make sense, I will pull a request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4130) CallGenerator could generate illegal code when taking no operands
[ https://issues.apache.org/jira/browse/FLINK-4130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356537#comment-15356537 ] Jark Wu commented on FLINK-4130: It seems that cody has pulled a request for this issue: https://github.com/apache/flink/pull/2182 > CallGenerator could generate illegal code when taking no operands > - > > Key: FLINK-4130 > URL: https://issues.apache.org/jira/browse/FLINK-4130 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Cody >Priority: Minor > > In CallGenerator, when a call takes no operands, and null check is enabled, > it will generate code like: > boolean isNull$17 = ; > which will fail to compile at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3792) RowTypeInfo equality should not depend on field names
[ https://issues.apache.org/jira/browse/FLINK-3792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356552#comment-15356552 ] Jark Wu commented on FLINK-3792: Do we need this before 1.1.0 release ? If yes, I can take up this issue. We can modify {{RowTypeInfo}} constructor to take only one fieldTypes parameter, and create default names like ("f0", "f1", ...). Because for Row we only need to check types, we don't need to pass the fieldNames parameter to RowTypeInfo. > RowTypeInfo equality should not depend on field names > - > > Key: FLINK-3792 > URL: https://issues.apache.org/jira/browse/FLINK-3792 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri > > Currently, two Rows with the same field types but different field names are > not considered equal by the Table API and SQL. This behavior might create > problems, e.g. it makes the following union query fail: > {code} > SELECT STREAM a, b, c FROM T1 UNION ALL > (SELECT STREAM d, e, f FROM T2 WHERE d < 3) > {code} > where a, b, c and d, e, f are fields of corresponding types. > {code} > Cannot union streams of different types: org.apache.flink.api.table.Row(a: > Integer, b: Long, c: String) and org.apache.flink.api.table.Row(d: Integer, > e: Long, f: String) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4180) Create a batch SQL example
[ https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4180: -- Assignee: Jark Wu > Create a batch SQL example > -- > > Key: FLINK-4180 > URL: https://issues.apache.org/jira/browse/FLINK-4180 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > Labels: starter > > Currently there is no runnable code example in `flink-table` showing a > working batch SQL query with the Table API. > A Scala and Java example should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4181) Add a basic streaming Table API example
[ https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4181: -- Assignee: Jark Wu > Add a basic streaming Table API example > --- > > Key: FLINK-4181 > URL: https://issues.apache.org/jira/browse/FLINK-4181 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > Labels: starter > > Although the Table API does not offer much streaming features yet, there > should be a runnable example showing how to convert, union, filter and > project streams with the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4180) Create a batch SQL example
[ https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367808#comment-15367808 ] Jark Wu commented on FLINK-4180: I'll work on this. > Create a batch SQL example > -- > > Key: FLINK-4180 > URL: https://issues.apache.org/jira/browse/FLINK-4180 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > Currently there is no runnable code example in `flink-table` showing a > working batch SQL query with the Table API. > A Scala and Java example should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4181) Add a basic streaming Table API example
[ https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367810#comment-15367810 ] Jark Wu commented on FLINK-4181: I'll work on this. > Add a basic streaming Table API example > --- > > Key: FLINK-4181 > URL: https://issues.apache.org/jira/browse/FLINK-4181 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > Although the Table API does not offer much streaming features yet, there > should be a runnable example showing how to convert, union, filter and > project streams with the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4240) Cannot use expressions in Scala Table API's groupBy method
[ https://issues.apache.org/jira/browse/FLINK-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387824#comment-15387824 ] Jark Wu commented on FLINK-4240: It's not a bug I think as we cannot select a column not in Group By. In this case we can rename the grouping field and it will work {code} val tblResult = input1.groupBy('a % 4 as 'd).select('d); {code} > Cannot use expressions in Scala Table API's groupBy method > -- > > Key: FLINK-4240 > URL: https://issues.apache.org/jira/browse/FLINK-4240 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > > The following code fails even though it should be supported according to the > documentation: > {code} > package com.dataartisans.batch > import org.apache.flink.api.scala._ > import org.apache.flink.api.scala.table._ > import org.apache.flink.api.table.{Row, TableConfig, TableEnvironment} > object ScalaSimpleTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tblConfig = new TableConfig > val tblEnv = TableEnvironment.getTableEnvironment(env, tblConfig) > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > // fails with org.apache.flink.api.table.ValidationException: cannot > resolve [a] given input [('a % 4)] > val tblResult = input1.groupBy('a % 4).select('a); > val result = tblEnv.toDataSet[Row](tblResult) > result.print() > } > } > {code} > {code} > Exception in thread "main" org.apache.flink.api.table.ValidationException: > cannot resolve [a] given input [('a % 4)] > at > org.apache.flink.api.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:87) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84) > at > org.apache.flink.api.table.trees.TreeNode.postOrderTransform(TreeNode.scala:72) > at > org.apache.flink.api.table.plan.logical.LogicalNode.org$apache$flink$api$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:120) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:133) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:132) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.flink.api.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137) > at > org.apache.flink.api.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84) > at > org.apache.flink.api.table.plan.logical.Project.validate(operators.scala:57) > at org.apache.flink.api.table.GroupedTable.select(table.scala:631) > at > com.dataartisans.batch.ScalaSimpleTableAPIJob$.main(ScalaSimpleTableAPIJob.scala:26) > at > com.dataartisans.batch.ScalaSimpleTableAPIJob.main(ScalaSimpleTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(Na
[jira] [Commented] (FLINK-4244) Field names for union operator do not have to be equal
[ https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387851#comment-15387851 ] Jark Wu commented on FLINK-4244: Yes, we just merged [FLINK-2985] , but not update the document. I will update it. [FLINK-2985] Allow different field names for unionAll() in Table API. > Field names for union operator do not have to be equal > -- > > Key: FLINK-4244 > URL: https://issues.apache.org/jira/browse/FLINK-4244 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Trivial > > Flink Table API's documentation says that the schemas of unioned tables have > to be identical (wrt types and names). However, union works also with tables > where the types are identical but not the names: > {code} > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val inputDS2 = env.fromCollection(input2Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > tblEnv.registerTable("foobar", input1) > val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) > tblEnv.registerTable("foobar2", input2) > val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM > foobar2") > tblEnv.toDataSet[Row](result).print() > {code} > We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4244) Field names for union operator do not have to be equal
[ https://issues.apache.org/jira/browse/FLINK-4244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4244: -- Assignee: Jark Wu > Field names for union operator do not have to be equal > -- > > Key: FLINK-4244 > URL: https://issues.apache.org/jira/browse/FLINK-4244 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Jark Wu >Priority: Trivial > > Flink Table API's documentation says that the schemas of unioned tables have > to be identical (wrt types and names). However, union works also with tables > where the types are identical but not the names: > {code} > val input1Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val input2Seq = 0 until 10 map {x => (x, ('a' + x).toChar.toString, > x.toDouble)} > val inputDS1 = env.fromCollection(input1Seq) > val inputDS2 = env.fromCollection(input2Seq) > val input1 = tblEnv.fromDataSet(inputDS1, 'a, 'b, 'c) > tblEnv.registerTable("foobar", input1) > val input2 = tblEnv.fromDataSet(inputDS2, 'd, 'e, 'f) > tblEnv.registerTable("foobar2", input2) > val result = tblEnv.sql("SELECT * FROM foobar UNION ALL SELECT * FROM > foobar2") > tblEnv.toDataSet[Row](result).print() > {code} > We should update the documentation accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15388780#comment-15388780 ] Jark Wu commented on FLINK-4250: I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence. > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4250) Cannot select other than first column from Table
[ https://issues.apache.org/jira/browse/FLINK-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15388780#comment-15388780 ] Jark Wu edited comment on FLINK-4250 at 7/22/16 2:55 AM: - I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "[NEXT VALUE FOR|https://msdn.microsoft.com/en-us//library/ff878370.aspx]"; which generates a sequence. was (Author: jark): I think the second exception is similar to the first, because "VALUE" is also a reserved SQL keyword as a part of "NEXT VALUE FOR" which generates a sequence. > Cannot select other than first column from Table > > > Key: FLINK-4250 > URL: https://issues.apache.org/jira/browse/FLINK-4250 > Project: Flink > Issue Type: Bug > Components: Scala API, Table API & SQL >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > > Using the Scala Table API and the {{CsvTableSource}} I cannot select a column > from the csv source. The following code: > {code} > package com.dataartisans.batch > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.api.table.sources.CsvTableSource > import org.apache.flink.api.table.{Row, TableEnvironment, Table} > object CsvTableAPIJob { > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val csvFilePath = "table-jobs/src/main/resources/input.csv" > val tblEnv = TableEnvironment.getTableEnvironment(env) > val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", > "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO)) > tblEnv.registerTableSource("foobar", csvTS) > val input = tblEnv.sql("SELECT user FROM foobar") > tblEnv.toDataSet[Row](input).print() > } > } > {code} > fails with > {code} > Exception in thread "main" > org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) > at > org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) > at > org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) > at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389003#comment-15389003 ] Jark Wu commented on FLINK-4252: I debug it and find that we use {Class.getCanonicalName} to get the canonical name of the underlying class which will return null if the underlying class does not have a canonical name (i.e., if it is a local or anonymous class or an array whose component type does not have a canonical name). So you should move the {Dandan} case class definition out of the main function, and it should work. But I haven't found any document telling that we cannot define local case class, so I'm not sure whether it is a bug, [~fhueske] [~twalthr] what do you think? > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > My code is in the attachments > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.FlatMapRunner caused an > exception: Table program cannot be compiled. This is a bug. Please file an > issue. > at
[jira] [Comment Edited] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389003#comment-15389003 ] Jark Wu edited comment on FLINK-4252 at 7/22/16 6:35 AM: - I debug it and find that we use {{Class.getCanonicalName}} to get the canonical name of the underlying class which will return null if the underlying class does not have a canonical name (i.e., if it is a local or anonymous class or an array whose component type does not have a canonical name). So you should move the {{Dandan}} case class definition out of the main function, and it should work. But I haven't found any document telling that we cannot define local case class, so I'm not sure whether it is a bug, [~fhueske] [~twalthr] what do you think? was (Author: jark): I debug it and find that we use {Class.getCanonicalName} to get the canonical name of the underlying class which will return null if the underlying class does not have a canonical name (i.e., if it is a local or anonymous class or an array whose component type does not have a canonical name). So you should move the {Dandan} case class definition out of the main function, and it should work. But I haven't found any document telling that we cannot define local case class, so I'm not sure whether it is a bug, [~fhueske] [~twalthr] what do you think? > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > My code is in the attachments > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.jav
[jira] [Commented] (FLINK-4252) Table program cannot be compiled
[ https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389341#comment-15389341 ] Jark Wu commented on FLINK-4252: Do we have the JIRA? Or just modify this JIRA to track it? > Table program cannot be compiled > > > Key: FLINK-4252 > URL: https://issues.apache.org/jira/browse/FLINK-4252 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: OS X EI Captain > scala 2.11.7 > jdk 8 >Reporter: Renkai Ge > Attachments: TestMain.scala > > > I'm trying the table apis. > I got some errors like this > My code is in the attachments > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672) > at TestMain$.main(TestMain.scala:31) > at TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The user defined 'open(Configuration)' method > in class org.apache.flink.api.table.runtime.FlatMapRunner caused an > exception: Table program cannot be compiled. This is a bug. Please file an > issue. > at > org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47) > at > org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.tas
[jira] [Commented] (FLINK-4203) Improve Table API documentation
[ https://issues.apache.org/jira/browse/FLINK-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15389674#comment-15389674 ] Jark Wu commented on FLINK-4203: We should also improve [Table API Operators section|https://ci.apache.org/projects/flink/flink-docs-master/apis/table.html#table-api-operators]. Add an additional column to describe whether it is supported in Batch or Streaming or Both. In addition, maybe we should add a new section to describe SQL syntax we supported similar to Table API Operator section. > Improve Table API documentation > --- > > Key: FLINK-4203 > URL: https://issues.apache.org/jira/browse/FLINK-4203 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: Timo Walther > > Some ideas: > - Add a list of all supported scalar functions and a description > - Add a more advanced example > - Describe supported data types -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4263) SQL's VALUES does not work properly
[ https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4263: -- Assignee: Jark Wu > SQL's VALUES does not work properly > --- > > Key: FLINK-4263 > URL: https://issues.apache.org/jira/browse/FLINK-4263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: Jark Wu > > Executing the following SQL leads to very strange output: > {code} > SELECT * > FROM( > VALUES > (1, 2), > (3, 4) > ) AS q (col1, col2)" > {code} > {code} > org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at translateToPlan(DataSetValues.scala:88) > (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user code > wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281) > ... 51 more > Caused by: java.io.NotSerializableException: org.apache.flink.api.table.Row > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutput
[jira] [Commented] (FLINK-4263) SQL's VALUES does not work properly
[ https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15393950#comment-15393950 ] Jark Wu commented on FLINK-4263: It seems that ValuesInputFormat has a {{Seq[Row]}} field, but the {{Row}} is not serializable. I will try to fix this. > SQL's VALUES does not work properly > --- > > Key: FLINK-4263 > URL: https://issues.apache.org/jira/browse/FLINK-4263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther > > Executing the following SQL leads to very strange output: > {code} > SELECT * > FROM( > VALUES > (1, 2), > (3, 4) > ) AS q (col1, col2)" > {code} > {code} > org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at translateToPlan(DataSetValues.scala:88) > (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user code > wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:888) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:281) > ... 51 more > Caused by: java.io.NotSerializableException: org.apache.flink.api.table.Row > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.Obje
[jira] [Commented] (FLINK-4263) SQL's VALUES does not work properly
[ https://issues.apache.org/jira/browse/FLINK-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396844#comment-15396844 ] Jark Wu commented on FLINK-4263: Hi [~gallenvara_bg], thanks for your advice. But I think override {{writeObject}} in {{ValuesInputFormat}} is a little tricky. Actually we just need to replace {{Seq[Row]}} field by {{Seq[List]}} in {{ValuesInputFormat}}. In addition, there are other bugs , such as {code} ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Integer {code} and if the expected type of DataSet is not Row but others (e.g. WC case class), the following exception will throw {code} java.lang.ClassCastException: org.apache.flink.examples.scala.WordCountSQL$$anon$2 cannot be cast to org.apache.flink.api.table.typeutils.RowTypeInfo at org.apache.flink.api.table.plan.nodes.dataset.DataSetValues.translateToPlan(DataSetValues.scala:74) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at org.apache.flink.examples.scala.WordCountSQL$.main(WordCountSQL.scala:42) at org.apache.flink.examples.scala.WordCountSQL.main(WordCountSQL.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) {code} > SQL's VALUES does not work properly > --- > > Key: FLINK-4263 > URL: https://issues.apache.org/jira/browse/FLINK-4263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: Jark Wu > > Executing the following SQL leads to very strange output: > {code} > SELECT * > FROM( > VALUES > (1, 2), > (3, 4) > ) AS q (col1, col2)" > {code} > {code} > org.apache.flink.optimizer.CompilerException: Error translating node 'Data > Source "at translateToPlan(DataSetValues.scala:88) > (org.apache.flink.api.table.runtime.ValuesInputFormat)" : NONE [[ > GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user code > wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:381) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:106) > at > org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:76) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:637) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > at > org.apache.flink.api.scala.batch.sql.SortITCase.testOrderByMultipleFieldsWithSql(SortITCase.scala:56) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.table.Row > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279) > at > or
[jira] [Assigned] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4271: -- Assignee: Jark Wu > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397004#comment-15397004 ] Jark Wu commented on FLINK-4271: The CoGroupStream will construct the following graph. {code} source -> MAP --- |-> WindowOp -> Sink source -> MAP --- {code} By now , the MAP and WindowOp can not set parallelism. We can keep the MAP has same parallelism as previous operator. And we can change {{WindowedStream.apply}} to return a {{SingleOutputStreamOperator}} instead of {{DataStream}}, so that we can change WindowOp's parallelism. > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397004#comment-15397004 ] Jark Wu edited comment on FLINK-4271 at 7/28/16 6:17 AM: - The CoGroupStream will construct the following graph. {code} source -> MAP --- |-> WindowOp -> Sink source -> MAP --- {code} By now , the MAP and WindowOp can not set parallelism. We can keep the MAP has same parallelism as previous operator. And we can change {{CoGroupedStreams.apply}} to return a {{SingleOutputStreamOperator}} instead of {{DataStream}}, so that we can change WindowOp's parallelism. The same thing should be done to {{JoinedStream}}. was (Author: jark): The CoGroupStream will construct the following graph. {code} source -> MAP --- |-> WindowOp -> Sink source -> MAP --- {code} By now , the MAP and WindowOp can not set parallelism. We can keep the MAP has same parallelism as previous operator. And we can change {{WindowedStream.apply}} to return a {{SingleOutputStreamOperator}} instead of {{DataStream}}, so that we can change WindowOp's parallelism. > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397187#comment-15397187 ] Jark Wu commented on FLINK-4271: Yea, SingleOutputStreamOperator can set parallelism, uid, name and so on. But Where and EqualsTo will not reflect to physical operators, it is just used to get the keys to co-group. So it has no attributes to set. > There is no way to set parallelism of operators produced by CoGroupedStreams > > > Key: FLINK-4271 > URL: https://issues.apache.org/jira/browse/FLINK-4271 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Wenlong Lyu >Assignee: Jark Wu > > Currently, CoGroupStreams package the map/keyBy/window operators with a human > friendly interface, like: > dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the > intermediate operators and final window operators can not be accessed by > users, and we cannot set attributes of the operators, which make co-group > hard to use in production environment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4270) 'as' in front of join does not work
[ https://issues.apache.org/jira/browse/FLINK-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4270: -- Assignee: Jark Wu > 'as' in front of join does not work > --- > > Key: FLINK-4270 > URL: https://issues.apache.org/jira/browse/FLINK-4270 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > The following example passes the validation but fails during toRexNode phase: > {code} > val customers = getCustomerDataSet(env).toTable(tEnv) > .as('id, 'mktSegment) > val orders = getOrdersDataSet(env).toTable(tEnv) > .as('orderId, 'custId, 'orderDate, 'shipPrio) > val items = > orders.join(customers) > .where('custId === 'id) > items.printOnTaskManager("ok") > {code} > Leads to > {code} > Exception in thread "main" java.lang.IllegalArgumentException: field [custId] > not found; input fields are: [id, mktSegment, id0, mktSegment0] > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:299) > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:284) > at > org.apache.flink.api.table.expressions.ResolvedFieldReference.toRexNode(fieldExpression.scala:59) > at > org.apache.flink.api.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:34) > at > org.apache.flink.api.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:34) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.api.table.expressions.BinaryComparison.toRexNode(comparison.scala:34) > at > org.apache.flink.api.table.plan.logical.Filter.construct(operators.scala:158) > at > org.apache.flink.api.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:78) > at org.apache.flink.api.table.Table.getRelNode(table.scala:66) > at > org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:243) > at > org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) > at > org.apache.flink.api.scala.table.package$.table2RowDataSet(package.scala:77) > at > org.apache.flink.examples.scala.TPCHQuery3Table$.main(TPCHQuery3Table.scala:119) > at > org.apache.flink.examples.scala.TPCHQuery3Table.main(TPCHQuery3Table.scala) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4262) Consider null handling during sorting
[ https://issues.apache.org/jira/browse/FLINK-4262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405384#comment-15405384 ] Jark Wu commented on FLINK-4262: There is currently an open PR (https://github.com/apache/flink/pull/2282) by gallenvara that add support for ORDER BY OFFSET FETCH. So maybe you should wait for him, or at least coordinate with him to avoid code conflict. > Consider null handling during sorting > - > > Key: FLINK-4262 > URL: https://issues.apache.org/jira/browse/FLINK-4262 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Ivan Mushketyk >Priority: Minor > > Calcite's SQL parser allows to specify how to handle NULLs during sorting. > {code} > orderItem: > expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ] > {code} > Currently, the NULL FIRST/NULLS LAST is completely ignored but might be > helpful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4288) Make it possible to unregister tables
[ https://issues.apache.org/jira/browse/FLINK-4288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15407083#comment-15407083 ] Jark Wu commented on FLINK-4288: Because {{SchemaPlus}} has no {{remove}} function, we can't unregister tables easily. Do you have any ideas to do this ? > Make it possible to unregister tables > - > > Key: FLINK-4288 > URL: https://issues.apache.org/jira/browse/FLINK-4288 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > Table names can not be changed yet. After registration you can not modify the > table behind a table name. Maybe this behavior is too restrictive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4359) Add INTERVAL type
[ https://issues.apache.org/jira/browse/FLINK-4359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15415204#comment-15415204 ] Jark Wu commented on FLINK-4359: Do we have any ideas how to implement StreamSQL window? Because currently calcite doesn't support window in StreamSQL. > Add INTERVAL type > - > > Key: FLINK-4359 > URL: https://issues.apache.org/jira/browse/FLINK-4359 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > In order to start with StreamSQL windows we need a way to define intervals in > time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream
[ https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4366: -- Assignee: Jark Wu > Enforce parallelism=1 For AllWindowedStream > --- > > Key: FLINK-4366 > URL: https://issues.apache.org/jira/browse/FLINK-4366 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Jark Wu > > Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and > then set a different parallelism afterwards. Flink will silently accept this > and spawn the number of parallel operators, only one instance of those will > do all the processing, though, since the elements are implicitly keyed by a > dummy key. > We should throw an exception if users try to set a parallelism on an > all-windowed stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4359) Add INTERVAL type
[ https://issues.apache.org/jira/browse/FLINK-4359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15417187#comment-15417187 ] Jark Wu commented on FLINK-4359: Okay, I see. I'm interested in this, and I'm glad if I can help anything. > Add INTERVAL type > - > > Key: FLINK-4359 > URL: https://issues.apache.org/jira/browse/FLINK-4359 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.2.0 > > > In order to start with StreamSQL windows we need a way to define intervals in > time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)