Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi all! I think that in order to get a better hold on how we what to build the Table API, we need to *decide what the role of the Table API should be*. We touched on that a few times, but I think we still have different ideas about that. To get there, let me take back a step and look at the design of Stream SQL again. There were basically two competing approaches: (1) Keep SQL as it is and make it run on infinite streams via introducing dynamic tables (2) Do a new language that is similar to SQL, but designed with streaming concepts in mind (first class support for time and windows) Both approaches had good points. The Stream SQL design doc posted followed approach (1) - keep SQL as it is. Now, for the Table API, we seem to be having a similar discussion again. (1) Let the Table API be as similar to SQL as possible, simply make it feel "fluently embedded" in Scala. (2) Define the Table API as one would define a new and clean DSL for streaming. SQL inspired, of course. Where SQL syntax feels natural, use the SQL syntax, but make it very accessible to Java/Scala (non-SQL) programmers. I am personally more in favor of variant (2) for the following reasons: - We already have SQL compliant with the standards and tools. - Mirroring SQL too closely into the Table API has the marginal benefit that someone close to SQL will find it a bit more familiar. Not sure if that is even the case, as they have to re-learn the fluent DSL and Scala concepts - We are making it more difficult for all those that come from a more Scala/Java DataStream background and simply want to move "a layer up", getting schema and more optimizations into the equation. What would that mean for the specific issues that are discussed in FLIP-11? Based on interpreting the Table API as re-imaged streaming DSL, I would suggest to - Not put the window definition into the groupBy clause. It just is unexpected for all that are not super familiar with SQL and hard to discover in the IDE. A separate window clause is simpler for users coming from the DataStream background (or other streaming APIs) and it is more discoverable in the IDE. - I like the idea of having separate ".window()" and ".rowWindow()" clauses. Makes it more explicit that very different things will happen. - I would prefer to not have a "partitionBy" statement. When we restrain the Table API at least initially to having one partitioning for the windows, we should be able to express the partitioning by simply adding it to the fields in the "groupBy" clause. That would make the API easier accessible to users that not SQL powerusers. What do others think? Greetings, Stephan On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueskewrote: > Thanks for your reply Shaoxuan! > > Please see my replies below. > > Best, Fabian > > 2016-10-14 11:34 GMT+02:00 Sean Wang : > > > Thanks for your quick reply, Fabian. > > > > I have a few minor comments: > > > > > > - Agree that we should consider GroupBy without window after the new SQL > > proposal is settled down. > > > > > OK, so we keep this as it is for now. GroupBy without windows will be > supported later when we are able to "guard" the memory requirements of that > operation. > > > > > > - For Java API, we can keep window() call, and put window alias into > > Groupby clause. This can be also applied to rowwindow case. > > > > > Referring to the window alias in the groupBy clause would require to invert > the methods, i.e., groupBy().window() -> window().groupBy(). I am not sure > if that is more intuitive. Also, Scala and Java are using the same class > (Table) but different methods (Java uses String parameter, Scala Expression > parameters). In my opinion it makes sense to have both APIs closely synced. > So I would either keep window() (after groupBy) for Scala and Java or > remove it for both. > > > > & > > -+1 to support replace groupby() by partitionby(). BTW, in the case of > > over, instead of partitionby, are we going to support orderby? If yes, I > > would suggest to define rowwindow as rowwindow(PartionByParaType, > OrderBy > > ParaType, WindowParaType). > > > > > The current FLIP-11 proposal supports defining both partitionBy and orderBy > (with a few restrictions). > PartitionBy is defined for all windows alike by calling > > table.partitionBy(...).rowWindow(Window1 as w1, Window2 as > w2).select(count() over w1). > > Allowing windows with different partitioning would mean that data is > shuffled to different nodes and that we need a distributed join to assemble > the result rows. In principle this could be done but would be very > expensive to execute (applies to batch and streaming). In my opinion, we > should not support this case. > > OrderBy is implicitly supported by the on() clause of RowWindows, e.g., > > rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w) > > says that the window is ordered on the rowtime, i.e.,
Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Thanks for your quick reply, Fabian. I have a few minor comments: - Agree that we should consider GroupBy without window after the new SQL proposal is settled down. - For Java API, we can keep window() call, and put window alias into Groupby clause. This can be also applied to rowwindow case. &-+1 to support replace groupby() by partitionby(). BTW, in the case of over, instead of partitionby, are we going to support orderby? If yes, I would suggest to define rowwindow as rowwindow(PartionByParaType, OrderBy ParaType, WindowParaType). So - moving windows into the groupBy() call : +1 - making over() for rowWindow() with a single window definition. - additionally allowing window definitions in over(): +1 yes for scala, but use alias for java API. - using partitionBy() instead of groupBy() for row windows?: +1, but better to consider orderby, it may be even better to move partitionBy() into rowwindow. Regards, Shaoxuan On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske wrote: > Hi everybody, > > happy to see a good discussion here :-) > I'll reply to Shaoxuan's mail first and comment on Zhangrucong question > in a separate mail. > > Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we > should definitely follow the standard (batch) SQL syntax. > In my opinion, the Table API does not necessarily have to be as close as > possible to SQL but should try to make a few things easier and also safer > (easier is of course subjective). > > - GroupBy without windows: These are currently intentionally not supported > and also not part of FLIP-11. Our motivation for not supporting this, is to > guard the user from defining a query that fails when being executed due to > a very memory consuming operation. FLIP-11 provides a way to define such a > query as a sliding row window with unbounded preceding rows. With the > upcoming SQL proposal, queries that consume unbounded memory should be > identified and rejected. I would be in favor of allowing groupBy without > windows once the guarding mechanism are in place. > > - GroupBy with window: I think this is a question of taste. Having a > window() call, makes the feature more explicit in my opinion. However, I'm > not opposed to move the windows into the groupBy clause. > Implementation-wise it should be easy to move the window definition into to > groupBy clause for the Scala Table API. For the Java Table API we would > need to extend the parser quite a bit because windows would need to be > defined as Strings and not via objects. > > - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause > (implemented by PostgreSQL and Calcite) which allows to have "reusable" > window definitions. I think this is a desirable feature. In the FLIP-11 > proposal the over() clause in select() refers to the predefined windows > with aliases. In case only one window is defined, the over() clause is > optional and the same (and only) window is applied to all aggregates. I > think we can make the over() call mandatory to have the windowing more > explicit. It should also be possible to extend the over clause to directly > accept RowWindows instead of window aliases. I would not make this a > priority at the moment, but a feature that could be later added, because > rowWindow() and over() cover all cases. Similar as for GroupBy with > windows, we would need to extend the parser for the Java Table API though. > > Finally, I have an own suggestion: > In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I > think this should be changed to partitionBy() because groupBy() groups data > and applies an aggregation to all rows of a group which is not happening > here. In original SQL, the OVER clause features a PARTITION BY clause. We > are moving this out of the window definition, i.e., OVER clause, to enforce > the same partitioning for all windows (different partitionings would be a > challenge to execute in a parallel system). > > @Timo and all: What do you think about: > > - moving windows into the groupBy() call > - making over() for rowWindow() with a single window definition > - additionally allowing window definitions in over() > - using partitionBy() instead of groupBy() for row windows? > > Best, Fabian > > 2016-10-13 11:10 GMT+02:00 Zhangrucong : > >> Hi shaoxuan: >> >> I think, the streamsql must be excuted in table environment. So I call >> this table API ‘s StreamSQL. What do you call for this, stream Table API or >> streamsql? It is fu >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tblEnv = TableEnvironment.getTableEnvironment(env) >> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo") >> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) >> .map(f=>(f, 1L, 1L)) >> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") >> >> So in my opinion, the grammar which is marked
Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi Zhangrucong, yes, we want to use Calcite's SQL parser including its window syntax, i.e., - the standard SQL OVER windows (in streaming with a few restriction such as no different partitionings or orders) - the GroupBy window functions (TUMBLE, HOP, SESSION). The GroupBy window function are not implemented in Calcite yet. There is CALCITE-1345 [1] to track the issue. As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant. Best, Fabian [1] https://issues.apache.org/jira/browse/CALCITE-1345 2016-10-13 12:05 GMT+02:00 Fabian Hueske: > Hi everybody, > > happy to see a good discussion here :-) > I'll reply to Shaoxuan's mail first and comment on Zhangrucong question > in a separate mail. > > Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we > should definitely follow the standard (batch) SQL syntax. > In my opinion, the Table API does not necessarily have to be as close as > possible to SQL but should try to make a few things easier and also safer > (easier is of course subjective). > > - GroupBy without windows: These are currently intentionally not supported > and also not part of FLIP-11. Our motivation for not supporting this, is to > guard the user from defining a query that fails when being executed due to > a very memory consuming operation. FLIP-11 provides a way to define such a > query as a sliding row window with unbounded preceding rows. With the > upcoming SQL proposal, queries that consume unbounded memory should be > identified and rejected. I would be in favor of allowing groupBy without > windows once the guarding mechanism are in place. > > - GroupBy with window: I think this is a question of taste. Having a > window() call, makes the feature more explicit in my opinion. However, I'm > not opposed to move the windows into the groupBy clause. > Implementation-wise it should be easy to move the window definition into to > groupBy clause for the Scala Table API. For the Java Table API we would > need to extend the parser quite a bit because windows would need to be > defined as Strings and not via objects. > > - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause > (implemented by PostgreSQL and Calcite) which allows to have "reusable" > window definitions. I think this is a desirable feature. In the FLIP-11 > proposal the over() clause in select() refers to the predefined windows > with aliases. In case only one window is defined, the over() clause is > optional and the same (and only) window is applied to all aggregates. I > think we can make the over() call mandatory to have the windowing more > explicit. It should also be possible to extend the over clause to directly > accept RowWindows instead of window aliases. I would not make this a > priority at the moment, but a feature that could be later added, because > rowWindow() and over() cover all cases. Similar as for GroupBy with > windows, we would need to extend the parser for the Java Table API though. > > Finally, I have an own suggestion: > In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I > think this should be changed to partitionBy() because groupBy() groups data > and applies an aggregation to all rows of a group which is not happening > here. In original SQL, the OVER clause features a PARTITION BY clause. We > are moving this out of the window definition, i.e., OVER clause, to enforce > the same partitioning for all windows (different partitionings would be a > challenge to execute in a parallel system). > > @Timo and all: What do you think about: > > - moving windows into the groupBy() call > - making over() for rowWindow() with a single window definition > - additionally allowing window definitions in over() > - using partitionBy() instead of groupBy() for row windows? > > Best, Fabian > > 2016-10-13 11:10 GMT+02:00 Zhangrucong : > >> Hi shaoxuan: >> >> I think, the streamsql must be excuted in table environment. So I call >> this table API ‘s StreamSQL. What do you call for this, stream Table API or >> streamsql? It is fu >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tblEnv = TableEnvironment.getTableEnvironment(env) >> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo") >> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) >> .map(f=>(f, 1L, 1L)) >> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") >> >> So in my opinion, the grammar which is marked red should be compatible >> with calcite's StreamSQL grammar. >> >> By the way, thanks very much for telling me the modified content in >> Flink StreamSQL. I will look the new proposal . >> >> Thanks! >> 发件人: Sean Wang [mailto:wshaox...@gmail.com] >> 发送时间: 2016年10月13日 16:29 >> 收件人: dev@flink.apache.org; Zhangrucong >> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations >> >> Hi zhangrucong, >> I am not sure what you mean by "table API'S
Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi everybody, happy to see a good discussion here :-) I'll reply to Shaoxuan's mail first and comment on Zhangrucong question in a separate mail. Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we should definitely follow the standard (batch) SQL syntax. In my opinion, the Table API does not necessarily have to be as close as possible to SQL but should try to make a few things easier and also safer (easier is of course subjective). - GroupBy without windows: These are currently intentionally not supported and also not part of FLIP-11. Our motivation for not supporting this, is to guard the user from defining a query that fails when being executed due to a very memory consuming operation. FLIP-11 provides a way to define such a query as a sliding row window with unbounded preceding rows. With the upcoming SQL proposal, queries that consume unbounded memory should be identified and rejected. I would be in favor of allowing groupBy without windows once the guarding mechanism are in place. - GroupBy with window: I think this is a question of taste. Having a window() call, makes the feature more explicit in my opinion. However, I'm not opposed to move the windows into the groupBy clause. Implementation-wise it should be easy to move the window definition into to groupBy clause for the Scala Table API. For the Java Table API we would need to extend the parser quite a bit because windows would need to be defined as Strings and not via objects. - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause (implemented by PostgreSQL and Calcite) which allows to have "reusable" window definitions. I think this is a desirable feature. In the FLIP-11 proposal the over() clause in select() refers to the predefined windows with aliases. In case only one window is defined, the over() clause is optional and the same (and only) window is applied to all aggregates. I think we can make the over() call mandatory to have the windowing more explicit. It should also be possible to extend the over clause to directly accept RowWindows instead of window aliases. I would not make this a priority at the moment, but a feature that could be later added, because rowWindow() and over() cover all cases. Similar as for GroupBy with windows, we would need to extend the parser for the Java Table API though. Finally, I have an own suggestion: In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I think this should be changed to partitionBy() because groupBy() groups data and applies an aggregation to all rows of a group which is not happening here. In original SQL, the OVER clause features a PARTITION BY clause. We are moving this out of the window definition, i.e., OVER clause, to enforce the same partitioning for all windows (different partitionings would be a challenge to execute in a parallel system). @Timo and all: What do you think about: - moving windows into the groupBy() call - making over() for rowWindow() with a single window definition - additionally allowing window definitions in over() - using partitionBy() instead of groupBy() for row windows? Best, Fabian 2016-10-13 11:10 GMT+02:00 Zhangrucong: > Hi shaoxuan: > > I think, the streamsql must be excuted in table environment. So I call > this table API ‘s StreamSQL. What do you call for this, stream Table API or > streamsql? It is fu > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tblEnv = TableEnvironment.getTableEnvironment(env) > val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo") > tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) > .map(f=>(f, 1L, 1L)) > val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") > > So in my opinion, the grammar which is marked red should be compatible > with calcite's StreamSQL grammar. > > By the way, thanks very much for telling me the modified content in Flink > StreamSQL. I will look the new proposal . > > Thanks! > 发件人: Sean Wang [mailto:wshaox...@gmail.com] > 发送时间: 2016年10月13日 16:29 > 收件人: dev@flink.apache.org; Zhangrucong > 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations > > Hi zhangrucong, > I am not sure what you mean by "table API'S StreamSQL", I guess you mean > "stream TableAPI"? > TableAPI should be compatible with calcite SQL. (By compatible, My > understanding is that both TableAPI and SQL will be translated to the same > logical plan - the same set of REL and REX). > BTW, please note that we recently have merged a change to remove STREAM > keyword for flink stream SQL(FLINK-4546). In our opinion, batch and stream > are not necessarily to be differentiated at the SQL level. The major > difference between batch and stream is "WHEN and HOW to emit the result". > We have been working on a new proposal with Fabian on this change. I guess > it will be sent out for review very soon. > > Regards, > Shaoxuan > > > On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong