Re: FW: [DISCUSS] Table API / SQL indicators for event and processing time
Hi Timo, I had a look at your branch. Thanks for all the refactoring and work you put into this. I like the proposal a lot. I think indicating the time attribute in the schema is a good idea. I'm not sure if we should support the rowtime expression for batch table though. For batch, any long or timestamp attribute can be used as a time attribute. So marking a single one as time attribute is not really necessary, IMO. If others think that this is needed to make the batch and stream cases identical I would be OK with having it. OTOH I would not consider the schema to be part of the query. I think we should continue with this work as it is likely to take more time until we can merge it into the master. @Timo: would the next step be to open a PR for this? Best, Fabian 2017-03-20 14:15 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I am not sure if it is not about setting the timestamp within the query > but you can imagine that there examples where you have different timestamps > as mentioned. Take for example the case when we do a purchase online. You > have: > -time of purchase (when the payment was input/triggered) > -time of executing the transaction at bank (when the payment is processed > from account) > -time of settlement (when the payment is executed at merchant bank - when > money are received by the seller) > > In such a scenario you can imagine that over the same stream of online > payments you might want to run different queries, each that might be driven > by one of these times. Supporting such a scenario would mean that we have > one input stream that enters flink engine via a table source and then in > the query we can start running different queries: > e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10 //you > want the amount over your last 10 orders > e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you > want the amount over your last 10 income > > Best regards, > > > -Original Message- > From: Timo Walther [mailto:twal...@apache.org] > Sent: Monday, March 20, 2017 2:05 PM > To: dev@flink.apache.org > Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and > processing time > > Yes, you are right. In the current design the user cannot assign > timestamp and watermarks in a table program. Operators (such as windows) > might adapt the metatimestamp, if this is the case this adaption might > need to be expressed in the query itself too. > > E.g. for a tumbling windows we could limit the select part to > table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime > matches the physical metatimestamp) > > Do you have a good example use case that needs the assignment of rowtime > within a query? > > Am 20/03/17 um 13:39 schrieb Radu Tudoran: > > Hi, > > > > As suggested by Timo - I am forwarding this to the mailing list. Sorry > for not having the conversation directly here - I initially thought it > might not be of interest... > > > > @Timo - thanks for the clarification. I get the main point now which is > that the rowtime is encoded within the metadata of the record. I think > this is key. My view on the matter was maybe a bit updated in the sense > that I saw the processing pipeline as an input source (as you exemplify - a > table scan) and from there you have a timestamp and water mark assigner > before the processing actually starts. So by overriding the timestamp > extractor you match the field that carries the eventtime/rowtime with the > mechanism from flink. But as far as I understand this would not be the case > anymore...am I right? In case the assignment of the rowtime to the metadata > of the record is done differently - what would be the way to do it? > > > > > > Dr. Radu Tudoran > > Senior Research Engineer - Big Data Expert > > IT R Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not li
RE: FW: [DISCUSS] Table API / SQL indicators for event and processing time
Hi, I am not sure if it is not about setting the timestamp within the query but you can imagine that there examples where you have different timestamps as mentioned. Take for example the case when we do a purchase online. You have: -time of purchase (when the payment was input/triggered) -time of executing the transaction at bank (when the payment is processed from account) -time of settlement (when the payment is executed at merchant bank - when money are received by the seller) In such a scenario you can imagine that over the same stream of online payments you might want to run different queries, each that might be driven by one of these times. Supporting such a scenario would mean that we have one input stream that enters flink engine via a table source and then in the query we can start running different queries: e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10 //you want the amount over your last 10 orders e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you want the amount over your last 10 income Best regards, -Original Message- From: Timo Walther [mailto:twal...@apache.org] Sent: Monday, March 20, 2017 2:05 PM To: dev@flink.apache.org Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and processing time Yes, you are right. In the current design the user cannot assign timestamp and watermarks in a table program. Operators (such as windows) might adapt the metatimestamp, if this is the case this adaption might need to be expressed in the query itself too. E.g. for a tumbling windows we could limit the select part to table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime matches the physical metatimestamp) Do you have a good example use case that needs the assignment of rowtime within a query? Am 20/03/17 um 13:39 schrieb Radu Tudoran: > Hi, > > As suggested by Timo - I am forwarding this to the mailing list. Sorry for > not having the conversation directly here - I initially thought it might not > be of interest... > > @Timo - thanks for the clarification. I get the main point now which is that > the rowtime is encoded within the metadata of the record. I think this is > key. My view on the matter was maybe a bit updated in the sense that I saw > the processing pipeline as an input source (as you exemplify - a table scan) > and from there you have a timestamp and water mark assigner before the > processing actually starts. So by overriding the timestamp extractor you > match the field that carries the eventtime/rowtime with the mechanism from > flink. But as far as I understand this would not be the case anymore...am I > right? In case the assignment of the rowtime to the metadata of the record is > done differently - what would be the way to do it? > > > Dr. Radu Tudoran > Senior Research Engineer - Big Data Expert > IT R Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from HUAWEI, > which is intended only for the person or entity whose address is listed > above. Any use of the information contained herein in any way (including, but > not limited to, total or partial disclosure, reproduction, or dissemination) > by persons other than the intended recipient(s) is prohibited. If you receive > this e-mail in error, please notify the sender by phone or email immediately > and delete it! > > > -Original Message- > From: Timo Walther [mailto:twal...@apache.org] > Sent: Monday, March 20, 2017 12:29 PM > To: Radu Tudoran > Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing > time > > You are not bothering me, it is very interesting to compare the design > with real world use cases. > > In your use case we would create table like: tEnv.toTable('date, 'time1, > 'time2, 'data, 'myrowtime.rowtime) > > We would not "overwrite" an actual attribute of the record but only add > logical "myrowtime". In general, just to make it clear again, the > rowtime must be in the metatimestamp of the record (by using a timestamp > extractor before). The Table API assumes that records that enter the > Table API are timestamped correctly. So in your use case, you would > create your own TableSourc
Re: FW: [DISCUSS] Table API / SQL indicators for event and processing time
Yes, you are right. In the current design the user cannot assign timestamp and watermarks in a table program. Operators (such as windows) might adapt the metatimestamp, if this is the case this adaption might need to be expressed in the query itself too. E.g. for a tumbling windows we could limit the select part to table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime matches the physical metatimestamp) Do you have a good example use case that needs the assignment of rowtime within a query? Am 20/03/17 um 13:39 schrieb Radu Tudoran: Hi, As suggested by Timo - I am forwarding this to the mailing list. Sorry for not having the conversation directly here - I initially thought it might not be of interest... @Timo - thanks for the clarification. I get the main point now which is that the rowtime is encoded within the metadata of the record. I think this is key. My view on the matter was maybe a bit updated in the sense that I saw the processing pipeline as an input source (as you exemplify - a table scan) and from there you have a timestamp and water mark assigner before the processing actually starts. So by overriding the timestamp extractor you match the field that carries the eventtime/rowtime with the mechanism from flink. But as far as I understand this would not be the case anymore...am I right? In case the assignment of the rowtime to the metadata of the record is done differently - what would be the way to do it? Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Timo Walther [mailto:twal...@apache.org] Sent: Monday, March 20, 2017 12:29 PM To: Radu Tudoran Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time You are not bothering me, it is very interesting to compare the design with real world use cases. In your use case we would create table like: tEnv.toTable('date, 'time1, 'time2, 'data, 'myrowtime.rowtime) We would not "overwrite" an actual attribute of the record but only add logical "myrowtime". In general, just to make it clear again, the rowtime must be in the metatimestamp of the record (by using a timestamp extractor before). The Table API assumes that records that enter the Table API are timestamped correctly. So in your use case, you would create your own TableSource extract the timestamp based on your 3 time fields and define an attribute that represents the rowtime logically. In the current design we want that the Table API relies on Flink's time handling, because time handling can be very tricky.So we only support one event-time time field. But would it be possible to post our discussion on the ML? It might be interesting for others as well. If yes, can you forward our conversion to the ML? Timo Am 20/03/17 um 12:11 schrieb Radu Tudoran: Thanks for the replies. Regarding the ""It might be sometimes that this is not explicit to be guessed" That is why I added the RelTimeConverter. After this conversion step it should be as explicit as possible (by using the special types). And we can add special handling of functions (i.e. ceil) that preserve the monotonicity." ..maybe I am missing something so sorry if I just bother you for nothing (it is just to make sure we think of all cases before hand). I saw examples of applications where you have multiple fields of the same type. For example an event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is actually from a real application with some sort fo standard communication schema). I was referring to such cases that it is unclear to me how the code will identify the exact field to use as rowtime for example. This is what I meant about how are we passing indicators to spot the row time field as well as what would happen with the code in such a situation as it can identify multiple time fields. Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division HUAWEI TECHNOLOGIES
FW: [DISCUSS] Table API / SQL indicators for event and processing time
Hi, As suggested by Timo - I am forwarding this to the mailing list. Sorry for not having the conversation directly here - I initially thought it might not be of interest... @Timo - thanks for the clarification. I get the main point now which is that the rowtime is encoded within the metadata of the record. I think this is key. My view on the matter was maybe a bit updated in the sense that I saw the processing pipeline as an input source (as you exemplify - a table scan) and from there you have a timestamp and water mark assigner before the processing actually starts. So by overriding the timestamp extractor you match the field that carries the eventtime/rowtime with the mechanism from flink. But as far as I understand this would not be the case anymore...am I right? In case the assignment of the rowtime to the metadata of the record is done differently - what would be the way to do it? Dr. Radu Tudoran Senior Research Engineer - Big Data Expert IT R Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -Original Message- From: Timo Walther [mailto:twal...@apache.org] Sent: Monday, March 20, 2017 12:29 PM To: Radu Tudoran Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time You are not bothering me, it is very interesting to compare the design with real world use cases. In your use case we would create table like: tEnv.toTable('date, 'time1, 'time2, 'data, 'myrowtime.rowtime) We would not "overwrite" an actual attribute of the record but only add logical "myrowtime". In general, just to make it clear again, the rowtime must be in the metatimestamp of the record (by using a timestamp extractor before). The Table API assumes that records that enter the Table API are timestamped correctly. So in your use case, you would create your own TableSource extract the timestamp based on your 3 time fields and define an attribute that represents the rowtime logically. In the current design we want that the Table API relies on Flink's time handling, because time handling can be very tricky.So we only support one event-time time field. But would it be possible to post our discussion on the ML? It might be interesting for others as well. If yes, can you forward our conversion to the ML? Timo Am 20/03/17 um 12:11 schrieb Radu Tudoran: > Thanks for the replies. > > Regarding the ""It might be sometimes that this is not explicit to be > guessed" That is > why I added the RelTimeConverter. After this conversion step it should > be as explicit as possible (by using the special types). And we can add > special handling of functions (i.e. ceil) that preserve the monotonicity." > > ..maybe I am missing something so sorry if I just bother you for nothing (it > is just to make sure we think of all cases before hand). I saw examples of > applications where you have multiple fields of the same type. For example an > event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is > actually from a real application with some sort fo standard communication > schema). I was referring to such cases that it is unclear to me how the code > will identify the exact field to use as rowtime for example. This is what I > meant about how are we passing indicators to spot the row time field as well > as what would happen with the code in such a situation as it can identify > multiple time fields. > > Dr. Radu Tudoran > Senior Research Engineer - Big Data Expert > IT R Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang