Re: FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-27 Thread Fabian Hueske
Hi Timo,

I had a look at your branch. Thanks for all the refactoring and work you
put into this.

I like the proposal a lot. I think indicating the time attribute in the
schema is a good idea. I'm not sure if we should support the rowtime
expression for batch table though. For batch, any long or timestamp
attribute can be used as a time attribute. So marking a single one as time
attribute is not really necessary, IMO. If others think that this is needed
to make the batch and stream cases identical I would be OK with having it.
OTOH I would not consider the schema to be part of the query.

I think we should continue with this work as it is likely to take more time
until we can merge it into the master.
@Timo: would the next step be to open a PR for this?

Best, Fabian


2017-03-20 14:15 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I am not sure if it is not about setting the timestamp within the query
> but you can imagine that there examples where you have different timestamps
> as mentioned. Take for example the case when we do a purchase online. You
> have:
> -time of purchase  (when the payment was input/triggered)
> -time of executing the transaction at bank (when the payment is processed
> from account)
> -time of settlement (when the payment is executed at merchant bank - when
> money are received by the seller)
>
> In such a scenario you can imagine that over the same stream of online
> payments you might want to run different queries, each that might be driven
> by one of these times. Supporting such a scenario would mean that we have
> one input stream that enters flink engine via a table source and then in
> the query we can start running different queries:
> e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10   //you
> want the amount over your last 10 orders
> e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you
> want the amount over your last 10 income
>
> Best regards,
>
>
> -Original Message-
> From: Timo Walther [mailto:twal...@apache.org]
> Sent: Monday, March 20, 2017 2:05 PM
> To: dev@flink.apache.org
> Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and
> processing time
>
> Yes, you are right. In the current design the user cannot assign
> timestamp and watermarks in a table program. Operators (such as windows)
> might adapt the metatimestamp, if this is the case this adaption might
> need to be expressed in the query itself too.
>
> E.g. for a tumbling windows we could limit the select part to
> table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime
> matches the physical metatimestamp)
>
> Do you have a good example use case that needs the assignment of rowtime
> within a query?
>
> Am 20/03/17 um 13:39 schrieb Radu Tudoran:
> > Hi,
> >
> > As suggested by Timo - I am forwarding this to the mailing list. Sorry
> for not having the conversation directly here - I initially thought it
> might not be of interest...
> >
> > @Timo - thanks for the clarification. I get the main point now which is
> that the rowtime is encoded within the  metadata of the record. I think
> this is key. My view on the matter was maybe a bit updated in the sense
> that I saw the processing pipeline as an input source (as you exemplify - a
> table scan) and from there you have a timestamp and water mark assigner
> before the processing actually starts. So by overriding the timestamp
> extractor you match the field that carries the eventtime/rowtime with the
> mechanism from flink. But as far as I understand this would not be the case
> anymore...am I right? In case the assignment of the rowtime to the metadata
> of the record is done differently - what would be the way to do it?
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert
> > IT R Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not li

RE: FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Radu Tudoran
Hi,

I am not sure if it is not about setting the timestamp within the query but you 
can imagine that there examples where you have different timestamps as 
mentioned. Take for example the case when we do a purchase online. You have:
-time of purchase  (when the payment was input/triggered)
-time of executing the transaction at bank (when the payment is processed from 
account)
-time of settlement (when the payment is executed at merchant bank - when money 
are received by the seller)

In such a scenario you can imagine that over the same stream of online payments 
you might want to run different queries, each that might be driven by one of 
these times. Supporting such a scenario would mean that we have one input 
stream that enters flink engine via a table source and then in the query we can 
start running different queries:
e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10   //you want 
the amount over your last 10 orders
e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you want 
the amount over your last 10 income

Best regards,


-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Monday, March 20, 2017 2:05 PM
To: dev@flink.apache.org
Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and processing 
time

Yes, you are right. In the current design the user cannot assign 
timestamp and watermarks in a table program. Operators (such as windows) 
might adapt the metatimestamp, if this is the case this adaption might 
need to be expressed in the query itself too.

E.g. for a tumbling windows we could limit the select part to 
table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime 
matches the physical metatimestamp)

Do you have a good example use case that needs the assignment of rowtime 
within a query?

Am 20/03/17 um 13:39 schrieb Radu Tudoran:
> Hi,
>
> As suggested by Timo - I am forwarding this to the mailing list. Sorry for 
> not having the conversation directly here - I initially thought it might not 
> be of interest...
>
> @Timo - thanks for the clarification. I get the main point now which is that 
> the rowtime is encoded within the  metadata of the record. I think this is 
> key. My view on the matter was maybe a bit updated in the sense that I saw 
> the processing pipeline as an input source (as you exemplify - a table scan) 
> and from there you have a timestamp and water mark assigner before the 
> processing actually starts. So by overriding the timestamp extractor you 
> match the field that carries the eventtime/rowtime with the mechanism from 
> flink. But as far as I understand this would not be the case anymore...am I 
> right? In case the assignment of the rowtime to the metadata of the record is 
> done differently - what would be the way to do it?
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>
>
> -Original Message-
> From: Timo Walther [mailto:twal...@apache.org]
> Sent: Monday, March 20, 2017 12:29 PM
> To: Radu Tudoran
> Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing 
> time
>
> You are not bothering me, it is very interesting to compare the design
> with real world use cases.
>
> In your use case we would create table like: tEnv.toTable('date, 'time1,
> 'time2, 'data, 'myrowtime.rowtime)
>
> We would not "overwrite" an actual attribute of the record but only add
> logical "myrowtime". In general, just to make it clear again, the
> rowtime must be in the metatimestamp of the record (by using a timestamp
> extractor before). The Table API assumes that records that enter the
> Table API are timestamped correctly. So in your use case, you would
> create your own TableSourc

Re: FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Timo Walther
Yes, you are right. In the current design the user cannot assign 
timestamp and watermarks in a table program. Operators (such as windows) 
might adapt the metatimestamp, if this is the case this adaption might 
need to be expressed in the query itself too.


E.g. for a tumbling windows we could limit the select part to 
table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime 
matches the physical metatimestamp)


Do you have a good example use case that needs the assignment of rowtime 
within a query?


Am 20/03/17 um 13:39 schrieb Radu Tudoran:

Hi,

As suggested by Timo - I am forwarding this to the mailing list. Sorry for not 
having the conversation directly here - I initially thought it might not be of 
interest...

@Timo - thanks for the clarification. I get the main point now which is that 
the rowtime is encoded within the  metadata of the record. I think this is key. 
My view on the matter was maybe a bit updated in the sense that I saw the 
processing pipeline as an input source (as you exemplify - a table scan) and 
from there you have a timestamp and water mark assigner before the processing 
actually starts. So by overriding the timestamp extractor you match the field 
that carries the eventtime/rowtime with the mechanism from flink. But as far as 
I understand this would not be the case anymore...am I right? In case the 
assignment of the rowtime to the metadata of the record is done differently - 
what would be the way to do it?


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Timo Walther [mailto:twal...@apache.org]
Sent: Monday, March 20, 2017 12:29 PM
To: Radu Tudoran
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time

You are not bothering me, it is very interesting to compare the design
with real world use cases.

In your use case we would create table like: tEnv.toTable('date, 'time1,
'time2, 'data, 'myrowtime.rowtime)

We would not "overwrite" an actual attribute of the record but only add
logical "myrowtime". In general, just to make it clear again, the
rowtime must be in the metatimestamp of the record (by using a timestamp
extractor before). The Table API assumes that records that enter the
Table API are timestamped correctly. So in your use case, you would
create your own TableSource extract the timestamp based on your 3 time
fields and define an attribute that represents the rowtime logically. In
the current design we want that the Table API relies on Flink's time
handling, because time handling can be very tricky.So we only support
one event-time time field.

But would it be possible to post our discussion on the ML? It might be
interesting for others as well. If yes, can you forward our conversion
to the ML?

Timo



Am 20/03/17 um 12:11 schrieb Radu Tudoran:

Thanks for the replies.

Regarding the ""It might be sometimes that this is not explicit to be guessed" 
That is
why I added the RelTimeConverter. After this conversion step it should
be as explicit as possible (by using the special types). And we can add
special handling of functions (i.e. ceil) that preserve the monotonicity."

..maybe I am missing something so sorry if I just bother you for nothing (it is 
just to make sure we think of all cases before hand). I saw examples of 
applications where you have multiple fields of the same type. For example an 
event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is 
actually from a real application with some sort fo standard communication 
schema). I was referring to such cases that it is unclear to me how the code 
will identify the exact field to use as rowtime for example. This is what I 
meant about how are we passing indicators to spot the row time field as well as 
what would happen with the code in such a situation as it can identify multiple 
time fields.

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES 

FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Radu Tudoran
Hi,

As suggested by Timo - I am forwarding this to the mailing list. Sorry for not 
having the conversation directly here - I initially thought it might not be of 
interest...

@Timo - thanks for the clarification. I get the main point now which is that 
the rowtime is encoded within the  metadata of the record. I think this is key. 
My view on the matter was maybe a bit updated in the sense that I saw the 
processing pipeline as an input source (as you exemplify - a table scan) and 
from there you have a timestamp and water mark assigner before the processing 
actually starts. So by overriding the timestamp extractor you match the field 
that carries the eventtime/rowtime with the mechanism from flink. But as far as 
I understand this would not be the case anymore...am I right? In case the 
assignment of the rowtime to the metadata of the record is done differently - 
what would be the way to do it?


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Monday, March 20, 2017 12:29 PM
To: Radu Tudoran
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time

You are not bothering me, it is very interesting to compare the design 
with real world use cases.

In your use case we would create table like: tEnv.toTable('date, 'time1, 
'time2, 'data, 'myrowtime.rowtime)

We would not "overwrite" an actual attribute of the record but only add 
logical "myrowtime". In general, just to make it clear again, the 
rowtime must be in the metatimestamp of the record (by using a timestamp 
extractor before). The Table API assumes that records that enter the 
Table API are timestamped correctly. So in your use case, you would 
create your own TableSource extract the timestamp based on your 3 time 
fields and define an attribute that represents the rowtime logically. In 
the current design we want that the Table API relies on Flink's time 
handling, because time handling can be very tricky.So we only support 
one event-time time field.

But would it be possible to post our discussion on the ML? It might be 
interesting for others as well. If yes, can you forward our conversion 
to the ML?

Timo



Am 20/03/17 um 12:11 schrieb Radu Tudoran:
> Thanks for the replies.
>
> Regarding the ""It might be sometimes that this is not explicit to be 
> guessed" That is
> why I added the RelTimeConverter. After this conversion step it should
> be as explicit as possible (by using the special types). And we can add
> special handling of functions (i.e. ceil) that preserve the monotonicity."
>
> ..maybe I am missing something so sorry if I just bother you for nothing (it 
> is just to make sure we think of all cases before hand). I saw examples of 
> applications where you have multiple fields of the same type. For example an 
> event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is 
> actually from a real application with some sort fo standard communication 
> schema). I was referring to such cases that it is unclear to me how the code 
> will identify the exact field to use as rowtime for example. This is what I 
> meant about how are we passing indicators to spot the row time field as well 
> as what would happen with the code in such a situation as it can identify 
> multiple time fields.
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang