Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-26 Thread Stephan Ewen
Hi all!

I think that in order to get a better hold on how we what to build the
Table API, we need to *decide what the role of the Table API should be*. We
touched on that a few times, but I think we still have different ideas
about that.

To get there, let me take back a step and look at the design of Stream SQL
again. There were basically two competing approaches:

(1) Keep SQL as it is and make it run on infinite streams via introducing
dynamic tables
(2) Do a new language that is similar to SQL, but designed with streaming
concepts in mind (first class support for time and windows)

Both approaches had good points. The Stream SQL design doc posted followed
approach (1) - keep SQL as it is.



Now, for the Table API, we seem to be having a similar discussion again.

(1) Let the Table API be as similar to SQL as possible, simply make it feel
"fluently embedded" in Scala.
(2) Define the Table API as one would define a new and clean DSL for
streaming. SQL inspired, of course. Where SQL syntax feels natural, use the
SQL syntax, but make it very accessible to Java/Scala (non-SQL) programmers.


I am personally more in favor of variant (2) for the following reasons:

  - We already have SQL compliant with the standards and tools.

  - Mirroring SQL too closely into the Table API has the marginal benefit
that someone close to SQL will find it a bit more familiar. Not sure if
that is even the case, as they have to re-learn the fluent DSL and Scala
concepts

  - We are making it more difficult for all those that come from a more
Scala/Java DataStream background and simply want to move "a layer up",
getting schema and more optimizations into the equation.




What would that mean for the specific issues that are discussed in FLIP-11?
Based on interpreting the Table API as re-imaged streaming DSL, I would
suggest to

  - Not put the window definition into the groupBy clause. It just is
unexpected for all that are not super familiar with SQL and hard to
discover in the IDE. A separate window clause is simpler for users coming
from the DataStream background (or other streaming APIs) and it is more
discoverable in the IDE.

  - I like the idea of having separate ".window()" and ".rowWindow()"
clauses. Makes it more explicit that very different things will happen.

  - I would prefer to not have a "partitionBy" statement. When we restrain
the Table API at least initially to having one partitioning for the
windows, we should be able to express the partitioning by simply adding it
to the fields in the "groupBy" clause. That would make the API easier
accessible to users that not SQL powerusers.


What do others think?

Greetings,
Stephan


On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske  wrote:

> Thanks for your reply Shaoxuan!
>
> Please see my replies below.
>
> Best, Fabian
>
> 2016-10-14 11:34 GMT+02:00 Sean Wang :
>
> > Thanks for your quick reply, Fabian.
> >
> > I have a few minor comments:
> >
> > 
> > - Agree that we should consider GroupBy without window after the new SQL
> > proposal is settled down.
> >
> >
> OK, so we keep this as it is for now. GroupBy without windows will be
> supported later when we are able to "guard" the memory requirements of that
> operation.
>
>
> > 
> > - For Java API, we can keep window() call, and put window alias into
> > Groupby clause. This can be also applied to rowwindow case.
> >
> >
> Referring to the window alias in the groupBy clause would require to invert
> the methods, i.e., groupBy().window() -> window().groupBy(). I am not sure
> if that is more intuitive. Also, Scala and Java are using the same class
> (Table) but different methods (Java uses String parameter, Scala Expression
> parameters). In my opinion it makes sense to have both APIs closely synced.
> So I would either keep window() (after groupBy) for Scala and Java or
> remove it for both.
>
>
> >  & 
> > -+1 to support replace groupby() by partitionby(). BTW, in the case of
> > over, instead of partitionby, are we going to support orderby? If yes, I
> > would suggest to define rowwindow as  rowwindow(PartionByParaType,
> OrderBy
> > ParaType, WindowParaType).
> >
> >
> The current FLIP-11 proposal supports defining both partitionBy and orderBy
> (with a few restrictions).
> PartitionBy is defined for all windows alike by calling
>
> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
> w2).select(count() over w1).
>
> Allowing windows with different partitioning would mean that data is
> shuffled to different nodes and that we need a distributed join to assemble
> the result rows. In principle this could be done but would be very
> expensive to execute (applies to batch and streaming). In my opinion, we
> should not support this case.
>
> OrderBy is implicitly supported by the on() clause of RowWindows, e.g.,
>
> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
>
> says that the window is ordered on the rowtime, i.e., 

Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-14 Thread Sean Wang
Thanks for your quick reply, Fabian.

I have a few minor comments:


- Agree that we should consider GroupBy without window after the new SQL
proposal is settled down.


- For Java API, we can keep window() call, and put window alias into
Groupby clause. This can be also applied to rowwindow case.

 & 
-+1 to support replace groupby() by partitionby(). BTW, in the case of
over, instead of partitionby, are we going to support orderby? If yes, I
would suggest to define rowwindow as  rowwindow(PartionByParaType, OrderBy
ParaType, WindowParaType).

So
- moving windows into the groupBy() call :   +1
- making over() for rowWindow() with a single window definition.
- additionally allowing window definitions in over():  +1 yes for scala,
but use alias for java API.
- using partitionBy() instead of groupBy() for row windows?: +1, but better
to consider orderby, it may be even better to move partitionBy() into
rowwindow.

Regards,
Shaoxuan


On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske  wrote:

> Hi everybody,
>
> happy to see a good discussion here :-)
> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
> in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we
> should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close as
> possible to SQL but should try to make a few things easier and also safer
> (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not supported
> and also not part of FLIP-11. Our motivation for not supporting this, is to
> guard the user from defining a query that fails when being executed due to
> a very memory consuming operation. FLIP-11 provides a way to define such a
> query as a sliding row window with unbounded preceding rows. With the
> upcoming SQL proposal, queries that consume unbounded memory should be
> identified and rejected. I would be in favor of allowing groupBy without
> windows once the guarding mechanism are in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However, I'm
> not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition into to
> groupBy clause for the Scala Table API. For the Java Table API we would
> need to extend the parser quite a bit because windows would need to be
> defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the FLIP-11
> proposal the over() clause in select() refers to the predefined windows
> with aliases. In case only one window is defined, the over() clause is
> optional and the same (and only) window is applied to all aggregates. I
> think we can make the over() call mandatory to have the windowing more
> explicit. It should also be possible to extend the over clause to directly
> accept RowWindows instead of window aliases. I would not make this a
> priority at the moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows. I
> think this should be changed to partitionBy() because groupBy() groups data
> and applies an aggregation to all rows of a group which is not happening
> here. In original SQL, the OVER clause features a PARTITION BY clause. We
> are moving this out of the window definition, i.e., OVER clause, to enforce
> the same partitioning for all windows (different partitionings would be a
> challenge to execute in a parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong :
>
>> Hi shaoxuan:
>>
>> I think,  the streamsql must be excuted in table environment. So I call
>> this table API ‘s StreamSQL. What do you call for this, stream Table API or
>> streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>> .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked 

Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-13 Thread Fabian Hueske
Hi Zhangrucong,

yes, we want to use Calcite's SQL parser including its window syntax, i.e.,

- the standard SQL OVER windows (in streaming with a few restriction such
as no different partitionings or orders)
- the GroupBy window functions (TUMBLE, HOP, SESSION).

The GroupBy window function are not implemented in Calcite yet. There is
CALCITE-1345 [1] to track the issue.

As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL
compliant.

Best, Fabian

[1] https://issues.apache.org/jira/browse/CALCITE-1345

2016-10-13 12:05 GMT+02:00 Fabian Hueske :

> Hi everybody,
>
> happy to see a good discussion here :-)
> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
> in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we
> should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close as
> possible to SQL but should try to make a few things easier and also safer
> (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not supported
> and also not part of FLIP-11. Our motivation for not supporting this, is to
> guard the user from defining a query that fails when being executed due to
> a very memory consuming operation. FLIP-11 provides a way to define such a
> query as a sliding row window with unbounded preceding rows. With the
> upcoming SQL proposal, queries that consume unbounded memory should be
> identified and rejected. I would be in favor of allowing groupBy without
> windows once the guarding mechanism are in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However, I'm
> not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition into to
> groupBy clause for the Scala Table API. For the Java Table API we would
> need to extend the parser quite a bit because windows would need to be
> defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the FLIP-11
> proposal the over() clause in select() refers to the predefined windows
> with aliases. In case only one window is defined, the over() clause is
> optional and the same (and only) window is applied to all aggregates. I
> think we can make the over() call mandatory to have the windowing more
> explicit. It should also be possible to extend the over clause to directly
> accept RowWindows instead of window aliases. I would not make this a
> priority at the moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows. I
> think this should be changed to partitionBy() because groupBy() groups data
> and applies an aggregation to all rows of a group which is not happening
> here. In original SQL, the OVER clause features a PARTITION BY clause. We
> are moving this out of the window definition, i.e., OVER clause, to enforce
> the same partitioning for all windows (different partitionings would be a
> challenge to execute in a parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong :
>
>> Hi shaoxuan:
>>
>> I think,  the streamsql must be excuted in table environment. So I call
>> this table API ‘s StreamSQL. What do you call for this, stream Table API or
>> streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>> .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked red should be compatible
>> with calcite's StreamSQL grammar.
>>
>> By the way,  thanks very much for telling me the modified content in
>> Flink StreamSQL. I will look the new proposal .
>>
>> Thanks!
>> 发件人: Sean Wang [mailto:wshaox...@gmail.com]
>> 发送时间: 2016年10月13日 16:29
>> 收件人: dev@flink.apache.org; Zhangrucong
>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi  zhangrucong,
>> I am not sure what you mean by "table API'S 

Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

2016-10-13 Thread Fabian Hueske
Hi everybody,

happy to see a good discussion here :-)
I'll reply to Shaoxuan's mail first and comment on Zhangrucong question in
a separate mail.

Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we
should definitely follow the standard (batch) SQL syntax.
In my opinion, the Table API does not necessarily have to be as close as
possible to SQL but should try to make a few things easier and also safer
(easier is of course subjective).

- GroupBy without windows: These are currently intentionally not supported
and also not part of FLIP-11. Our motivation for not supporting this, is to
guard the user from defining a query that fails when being executed due to
a very memory consuming operation. FLIP-11 provides a way to define such a
query as a sliding row window with unbounded preceding rows. With the
upcoming SQL proposal, queries that consume unbounded memory should be
identified and rejected. I would be in favor of allowing groupBy without
windows once the guarding mechanism are in place.

- GroupBy with window: I think this is a question of taste. Having a
window() call, makes the feature more explicit in my opinion. However, I'm
not opposed to move the windows into the groupBy clause.
Implementation-wise it should be easy to move the window definition into to
groupBy clause for the Scala Table API. For the Java Table API we would
need to extend the parser quite a bit because windows would need to be
defined as Strings and not via objects.

- RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
(implemented by PostgreSQL and Calcite) which allows to have "reusable"
window definitions. I think this is a desirable feature. In the FLIP-11
proposal the over() clause in select() refers to the predefined windows
with aliases. In case only one window is defined, the over() clause is
optional and the same (and only) window is applied to all aggregates. I
think we can make the over() call mandatory to have the windowing more
explicit. It should also be possible to extend the over clause to directly
accept RowWindows instead of window aliases. I would not make this a
priority at the moment, but a feature that could be later added, because
rowWindow() and over() cover all cases. Similar as for GroupBy with
windows, we would need to extend the parser for the Java Table API though.

Finally, I have an own suggestion:
In FLIP-11, groupBy() is  used to define the partitioning of RowWindows. I
think this should be changed to partitionBy() because groupBy() groups data
and applies an aggregation to all rows of a group which is not happening
here. In original SQL, the OVER clause features a PARTITION BY clause. We
are moving this out of the window definition, i.e., OVER clause, to enforce
the same partitioning for all windows (different partitionings would be a
challenge to execute in a parallel system).

@Timo and all: What do you think about:

- moving windows into the groupBy() call
- making over() for rowWindow() with a single window definition
- additionally allowing window definitions in over()
- using partitionBy() instead of groupBy() for row windows?

Best, Fabian

2016-10-13 11:10 GMT+02:00 Zhangrucong :

> Hi shaoxuan:
>
> I think,  the streamsql must be excuted in table environment. So I call
> this table API ‘s StreamSQL. What do you call for this, stream Table API or
> streamsql? It is fu
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tblEnv = TableEnvironment.getTableEnvironment(env)
> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
> .map(f=>(f, 1L, 1L))
> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>
> So in my opinion, the grammar which is marked red should be compatible
> with calcite's StreamSQL grammar.
>
> By the way,  thanks very much for telling me the modified content in Flink
> StreamSQL. I will look the new proposal .
>
> Thanks!
> 发件人: Sean Wang [mailto:wshaox...@gmail.com]
> 发送时间: 2016年10月13日 16:29
> 收件人: dev@flink.apache.org; Zhangrucong
> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>
> Hi  zhangrucong,
> I am not sure what you mean by "table API'S StreamSQL", I guess you mean
> "stream TableAPI"?
> TableAPI should be compatible with calcite SQL. (By compatible, My
> understanding is that both TableAPI and SQL will be translated to the same
> logical plan - the same set of REL and REX).
> BTW, please note that we recently have merged a change to remove STREAM
> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and stream
> are not necessarily to be differentiated at the SQL level. The major
> difference between batch and stream is "WHEN and HOW to emit the result".
> We have been working on a new proposal with Fabian on this change. I guess
> it will be sent out for review very soon.
>
> Regards,
> Shaoxuan
>
>
> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong