RE: RowSerializer

2018-02-26 Thread Radu Tudoran
Hi,

Thanks for the confirmation. 
In this case should we just leave things as they are - and if anyone is 
interested in having consistent processing each can build the wrapper you 
mention?


-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, February 26, 2018 9:34 AM
To: dev@flink.apache.org
Subject: Re: RowSerializer

Hi Radu,

none of Flink's serializers adds checksums to ensure data integrity.
It would be possible to implement a wrapping serializer that adds a checksum to 
each record, but that would be at the cost of performance.
Not sure if this is done at some point in Flink, maybe for savepoints.

Best, Fabian

2018-02-23 14:44 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
>
>
> I was using the RowSerializer (package org.apache.flink.api.java.
> typeutils.runtime;) recently to serialize Rows to file (for reading 
> them back in the future).
>
> I observed a strange behavior that I would like to double check with 
> you in case this is a serious problem to be addressed:
>
> When the rowserializer is used to convert data back, there is no check 
> for the consistency of the data (e.g., size of the object that was 
> serialized, checksum…). This leads to situation that for random reads 
> of bytes it can happen that inconsistent objects are deserialized, 
> which of course can lead to inconsistent data.
>
>
>
> For example if we serialize object of the form (Int, Long, Double, 
> String
> String) – if we have available only 1/3 of the bytes, we can end up 
> reading back objects  as (0,0,0,null, null) – this is not the only 
> example of how the object can be incorrectly deserialized …rather than 
> having an error for this process.
>
>
>
> Hence, I wanted to double check if this is an intended behavior for 
> some reason and if we should consider fixing the rowserializer to 
> guarantee integrity of the objects that are deserialized.
>
>
>
> Best regards,
>
>
>
> Dr. Radu Tudoran
>
> Staff Research Engineer - Big Data Expert
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> German Research Center
>
> Munich Office
>
> Riesstrasse 25, 80992
> <https://maps.google.com/?q=Riesstrasse+25,+80992=gmail=g
> >
> München
>
>
>
> E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>*
>
> Mobile: +49 15209084330 <+49%201520%209084330>
>
> Telephone: +49 891588344173 <+49%2089%201588344173>
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany 
> <https://maps.google.com/?q=Hansaallee+205,+40549+D%C3%BCsseldorf,+Ger
> many=gmail=g>,
> www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, 
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der 
> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>
> This e-mail and its attachments contain confidential information from 
> HUAWEI, which is intended only for the person or entity whose address 
> is listed above. Any use of the information contained herein in any 
> way (including, but not limited to, total or partial disclosure, 
> reproduction, or dissemination) by persons other than the intended 
> recipient(s) is prohibited. If you receive this e-mail in error, 
> please notify the sender by phone or email immediately and delete it!
>
>
>


RowSerializer

2018-02-23 Thread Radu Tudoran
Hi,

I was using the RowSerializer (package 
org.apache.flink.api.java.typeutils.runtime;) recently to serialize Rows to 
file (for reading them back in the future).
I observed a strange behavior that I would like to double check with you in 
case this is a serious problem to be addressed:
When the rowserializer is used to convert data back, there is no check for the 
consistency of the data (e.g., size of the object that was serialized, 
checksum...). This leads to situation that for random reads of bytes it can 
happen that inconsistent objects are deserialized, which of course can lead to 
inconsistent data.

For example if we serialize object of the form (Int, Long, Double, String 
String) - if we have available only 1/3 of the bytes, we can end up reading 
back objects  as (0,0,0,null, null) - this is not the only example of how the 
object can be incorrectly deserialized ...rather than having an error for this 
process.

Hence, I wanted to double check if this is an intended behavior for some reason 
and if we should consider fixing the rowserializer to guarantee integrity of 
the objects that are deserialized.

Best regards,

Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



running flink 1.4 in eclipse

2017-10-12 Thread Radu Tudoran
Hi,

Any thoughts of why would I get such an error when trying to run within eclipse 
a simple (i.e., any) flink program.

...I did start the local cluster before - where I was able to run the job...but 
after that it did not work anymore to run flink from eclipse.

Exception in thread "main" java.lang.NoSuchFieldError: WEB_ARCHIVE_COUNT
   at 
org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2535)
   at 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startJobManager(LocalFlinkMiniCluster.scala:143)
   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:328)
   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:322)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at scala.collection.immutable.Range.foreach(Range.scala:141)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:322)
   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:313)
   at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:116)
   at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176)
   at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
   at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:921)
   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
   at org.bdcourse.BatchJob.main(BatchJob.java:60)





RE: [DISCUSS] Table API / SQL internal timestamp handling

2017-07-27 Thread Radu Tudoran
Hi all,

@Shaoxuan - thanks for the  remarks. I have a question regarding your 
suggestion not to consider to create proctime window in a regular column. I 
think this would be useful though. First you might need to carry the timestamp 
indicator of when the processing happened (for log purposes, provenance, 
traceability ...). Secondly - I do not think it is contradicting with the 
semantics in batch SQL as in SQL you have the function "now()" ...which pretty 
much carry the same semantics as having a function to mark the proctime and 
then projecting this into a column. If I am not mistaken you can introduce in 
database columns the result of calling now().


Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Shaoxuan Wang [mailto:shaox...@apache.org] 
Sent: Thursday, July 27, 2017 6:00 AM
To: Dev
Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling

 Hi Everyone,
I like this proposal. The problem we used to have is that we have treated 
eventtime column as a special timestamp column. An eventtime column is nothing 
special than all other regular columns, but with a certain flag 
(eventtime-indicator) inferring that this column can be used as an eventime to 
decide when a bounded query can emit the final result by comparing with a 
concern associated waterMark.

I have a few comments adding on top of this (they may have already been 
addressed in the conversation — since It’s a long discussion, I may miss 
something):

   1. While we remove timestamp column, we introduce eventtime-indicator
   (we may already have this concept), it is only a flag can be applied for
   any column (note that some types may not be able to be used as eventtime
   column), indicating if this column can be used as eventtime or not. This
   flag is useful for validation and codeGen.
   2. A column that has been used as an eventtime, should not lose its own
   type. We should not cast all eventime column to the timestamp type. For
   instance, if a column is a long type, it will keep as long type even if a
   window aggregate has used it as a eventtime.
   3. Eventtime will only work well with some associated waterMark
   strategy. We may consider forcing user to provide a waterMark logic on
   his/her selected eventtime.
   4. For proctime, I hope we should not introduce proctime-indicator for
   regular column. Ideally we should not allow user to create proctime window
   on regular column, as this is against the batch query semantics. Therefore
   I suggest we should always introduce a proctime timestamp column for users
   to create proctime window. And unlike eventtime, proctime does not need any
   associated waterMark strategy, as there is no such out of order issue for
   the proctime.

Regards,
Shaoxuan

On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Thanks everybody for the replies so far.
>
> Let me answer your questions and reply to your thoughts:
>
> Radu:
> ---
> First of all, although my proposal is movivated by a join operator, 
> this discussion is about timestamp handling, not about joins in general.
>
> - The semantics of outer joins is to emit null and there is no way 
> around that. This is not an issue for us. Actually, outer joins are 
> supported by the batch SQL / Table API. It is true that outer joins 
> might result in null timestamps. Calcite will mark those fields as 
> nullable and we should check that timestamps which are used in windows or 
> joins are not nullable.
> - The query has to explicitly specify which timestamp attribute to use.
> Otherwise its semantics are not complete and it is invalid. A 
> group-window that follows a join will reference a timestamp attribute 
> and this will be used. The other timestamp might be projected out. 
> When a result with two timestamps is converted into a DataStream, the 
> user ha

RE: [DISCUSS] Table API / SQL internal timestamp handling

2017-07-26 Thread Radu Tudoran
Hi everyone,

I just want to add that I was referring to NULL values not specifically to 
timefields but to the event itself. If you have the follow situation

Stream 1:  |event1   | 
Stream 2:  | | 

And you have a LEFT JOIN between stream 1 and stream 2 (no condition)...then 
you still need to emit (event1,null) ... as this is the behavior of left join. 
This is maybe a very simple situation, but the point is that left joins and 
right joins can have situation when you have elements only in the main stream 
and no element in the right stream. And for this case you still need to emit.


Regarding whether time should be decided by system or not...i think the answer 
is it depends. I think the example from Jack is very good  and shows the need 
for some mechanisms to select/manage the time (I like the proposal of having 
functions to insert the time in the output!). However, if a business analyst 
would write a query without explicit time management we still need to have some 
default behavior in the system. As per my initial proposal, I think  we need to 
decide on one timestamp field to carry (either a new one at the moment of the 
join) or the timestamp from the main stream  (...although I am not sure which 
one is the main stream in the case of a full join:) )


Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Jark Wu [mailto:j...@apache.org] 
Sent: Wednesday, July 26, 2017 8:29 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling

Hi Xingcan,

IMO, I don't think event-time of join results could be automatically decided by 
system. Considering batch tables, if users want a event time window aggregation 
after join, user must specify the time field explicitly (T1.rowtime or 
T2.rowtime or the computed result of them). So in the case of streaming tables, 
the system also can't automatically decide the time field for users.

In regards to the question you asked, I think we don't need to change the 
watermark no matter we choose the left rowtime or right rowtime or the 
combination. Because the watermark has been aligned with the rowtime in the 
source. Maybe I'm wrong about this, please correct me if I'm missing something.

What do you think?

Regards,
Jark

2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingc...@gmail.com>:

> Hi all,
>
> @Fabian, thanks for raising this.
>
> @Radu and Jark, personally I think the timestamp field is critical for 
> query processing and thus should be declared as (or supposed to be) 
> NOT NULL. In addition, I think the event-time semantic of the join 
> results should be automatically decided by the system, i.e., we do not 
> hand it over to users so to avoid some unpredictable assignment.
>
> Generally speaking, consolidating different time fields is possible 
> since all of them should ideally be monotonically increasing. From my 
> point of view, the problem lies in
> (1) what's the relationship between the old and new watermarks. Shall 
> they be one-to-one mapping or the new watermarks could skip some 
> timestamps? And (2) who is in charge of emitting the blocked 
> watermarks, the operator or the process function?
>
> I'd like to hear from you.
>
> Best,
> Xingcan
>
>
>
> On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <j...@apache.org> wrote:
>
> > Hi,
> >
> > Radu's concerns make sense to me, especially the null value 
> > timestamp and multi-proctime.
> >
> > I have also something in my mind. I would like to propose some time 
> > indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will 
> > generate a event time logical attribute, PROC_TIME() will generate a 
> > processing time logical attribute. It is similar to TUMBLE_ROWTIME 
> > proposed in this PR https://github.com/apache/

RE: [DISCUSS] Table API / SQL internal timestamp handling

2017-07-25 Thread Radu Tudoran
Hi,

I think this is an interesting discussion and I would like to add some issues 
and give some feedback.

- For supporting the join we do not only need to think of the time but also on 
the null values. For example if you have a LEFT (or RIGHT) JOIN between items 
of 2 input streams, and the secondary input is not available you should still 
emit Row.of(event1, null)...as far as I know if you need to 
serialize/deserialize null values to send them they do not work. So we should 
include this scenario in the discussions
-If we will have multiple timestamp in an (output) event, one question is how 
to select afterwards which is the primary time field on which to operate. When 
we describe a query we might be able to specify (or we get this implicitly if 
we implement the carryon of the 2 timestamps)  Select T1.rowtime, T2.rowtime 
...but if the output of a query is the input of a new processing pipeline, 
then, do we support generally also that the input has 2 time fields? ...how do 
we deal with the 2 input fields (maybe I am missing something) further in the 
datastream pipeline that we build based on the output?
- For the case of proctime - do we need to carry 2 proctimes (the proctimes of 
the incoming events from each stream), or 1 proctime (as we operate on proctime 
and the combination of the 2 inputs can be considered as a new event, the 
current proctime on the machine can be considered the (proc)time reference for 
output event) or 3 proctimes (the 2 proctimes of the input plus the proctime 
when the new event was created)?
-Similar with the point above, for even time (which I am understanding as the 
time when the event was created...or do we understand them as a time carry 
within the event?) - when we join 2 events and output an event that is the 
result of the join - isn't this a new event detach from the source\input 
events? ... I would tend to say it is a new event and then as for proctime the 
event time of the new event is the current time when this output event was 
created. If we would accept this hypothesis then we would not need the 2 time 
input fields to be carried/managed implicitly.  If someone needs further down 
the computation pipeline, then in the query they would be selected explicitly 
from the input stream and projected in some fields to be carried (Select 
T1.rowtime as FormerTime1, T2.rowtime as FormerTime2,  JOIN T1, 
T2...)...but they would not have the timestamp logic

..my 2 cents




Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Tuesday, July 25, 2017 4:22 PM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL internal timestamp handling

Hi everybody,

I'd like to propose and discuss some changes in the way how the Table API / SQL 
internally handles timestamps.

The Table API is implemented on top of the DataStream API. The DataStream API 
hides timestamps from users in order to ensure that timestamps and watermarks 
are aligned. Instead users assign timestamps and watermarks once (usually at 
the source or in a subsequent operator) and let the system handle the 
timestamps from there on. Timestamps are stored in the timestamp field of the 
StreamRecord which is a holder for the user record and the timestamp. 
DataStream operators that depend on time (time-windows, process function, ...) 
access the timestamp from the StreamRecord.

In contrast to the DataSteam API, the Table API and SQL are aware of the 
semantics of a query. I.e., we can analyze how users access timestamps and 
whether they are modified or not. Another difference is that the timestamp must 
be part of the schema of a table in order to have correct query semantics.

The current design to handle timestamps is as follows. The Table API stores 
timestamps in the timestamp field of the StreamRecord. Therefore, timestamps 
are detached from the remaining data which is stored in Row

RE: ListState to List

2017-05-18 Thread Radu Tudoran
Actually that was one option that I was considering. I am still a bit fuzzy 
about the advantages and disadvantages of using one type of state over another. 
I know that using ValueState would mean that when getting the object value 
(i.e. a List in this case) the whole would be deserialized at once. This is ok 
if I need to go anyway through all elements. However, I understand that when I 
need to update the state the same will hold - the whole list would be 
serialized and re-write instead of a single element. Therefore if we want to 
get always the best performances it seemed to me that it would be worth 
considering have many specialized types of states - hence my proposal.



-Original Message-
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Thursday, May 18, 2017 11:49 AM
To: dev@flink.apache.org
Subject: Re: ListState to List

Hi Radu,

Why not using a ValueState that inside stored the whole list. 
Whenever you state#get() you get the whole list and you can sort it.

Kostas
 
> On May 18, 2017, at 3:31 AM, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> Hi Aljoscha,
> 
> Thanks for the clarification. I understand that there might be advantages in 
> some cases not to have the List-like interface, while in other scenarios 
> (like the one I described there aren't). Considering this, why not having 2 
> type of states: ListState and StreamInListState - users would use the one it 
> is more appropriate. What do you think?
> 
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Thursday, May 18, 2017 12:15 AM
> To: dev@flink.apache.org
> Subject: Re: ListState to List
> 
> Hi,
> The interface is restrictive on purpose because depending on the state 
> backend it might not be possible to provide a List-like interface. There 
> might be state backends that stream in the list from somewhere else or other 
> restrictions. If we now allowed a more general interface here we would 
> possibly prevent optimisations in the future or make certain implementations 
> very hard to to efficiently.
> 
> Best,
> Aljoscha
> 
>> On 16. May 2017, at 21:56, Radu Tudoran <radu.tudo...@huawei.com> wrote:
>> 
>> Hi,
>> 
>> I would like to work with ListState, more specifically I would need to 
>> access the contents and sort them. For this I would need a collection type 
>> (e.g., the List, Array...).
>> However, I see that if I have a variable of type <> the 
>> only interfaces I have are:
>> state.get -> which returns an Iterable Or state.get.getIterator which 
>> returns an Iterator
>> 
>> Basically if I use any of these I need now to copy the contents in an actual 
>> List of Array.  Is there any way to avoid this? ..perhaps there is an 
>> implicit type that I can convert to...
>> 
> 



RE: ListState to List

2017-05-17 Thread Radu Tudoran
Hi Aljoscha,

Thanks for the clarification. I understand that there might be advantages in 
some cases not to have the List-like interface, while in other scenarios (like 
the one I described there aren't). Considering this, why not having 2 type of 
states: ListState and StreamInListState - users would use the one it is more 
appropriate. What do you think?

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Thursday, May 18, 2017 12:15 AM
To: dev@flink.apache.org
Subject: Re: ListState to List

Hi,
The interface is restrictive on purpose because depending on the state backend 
it might not be possible to provide a List-like interface. There might be state 
backends that stream in the list from somewhere else or other restrictions. If 
we now allowed a more general interface here we would possibly prevent 
optimisations in the future or make certain implementations very hard to to 
efficiently.

Best,
Aljoscha

> On 16. May 2017, at 21:56, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> Hi,
> 
> I would like to work with ListState, more specifically I would need to access 
> the contents and sort them. For this I would need a collection type (e.g., 
> the List, Array...).
> However, I see that if I have a variable of type <> the 
> only interfaces I have are:
> state.get -> which returns an Iterable Or state.get.getIterator which 
> returns an Iterator
> 
> Basically if I use any of these I need now to copy the contents in an actual 
> List of Array.  Is there any way to avoid this? ..perhaps there is an 
> implicit type that I can convert to...
> 



ListState to List

2017-05-16 Thread Radu Tudoran
Hi,

I would like to work with ListState, more specifically I would need to access 
the contents and sort them. For this I would need a collection type (e.g., the 
List, Array...).
However, I see that if I have a variable of type <> the 
only interfaces I have are:
state.get -> which returns an Iterable
Or
state.get.getIterator which returns an Iterator

Basically if I use any of these I need now to copy the contents in an actual 
List of Array.  Is there any way to avoid this? ..perhaps there is an implicit 
type that I can convert to...



RE: [DISCUSS] Expose State Backend Interface for UDAGG

2017-05-12 Thread Radu Tudoran
Hi,

In general I believe it is a good idea to expose the state backend to the 
functions. You can always optimize the data processing based on the data 
storage. Hence, as the level of the processing (aggregation here) you would be 
able to control the access to data, you can implement this in a smart way. 
Moreover, we can also construct different data organizations/partition 
strategy/etc based on the specific computation. I understand that this would be 
quite an effort, but at some point it is worth making it.

Meanwhile if it would not be possible to have the aggregation function 
extending the rich interface, wouldn't we be able to supplement this with some 
extra logic in the process function that would provide the aggregates the 
needed data or at least pointers to the required state?

As far as I know it would be legal now to have something like:

ProcessFunction () {

ValueState state = ...

processElement(newElement) {

acc.accumulate(newElement, state)

}
}

WeightedAvgAccum {

public void accumulate(Row newElement, ValueState state) {

state.value
}
}
Would something like this at least partially solve the problem? ...it would 
allow you to manage the intermediate data directly in the state instead of the 
memory


-Original Message-
From: Shaoxuan Wang [mailto:wshaox...@gmail.com] 
Sent: Friday, May 12, 2017 1:20 PM
To: Dev
Cc: Stephan Ewen
Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG

Fabian,
Thanks for your quick reply.
The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
It is designed to provide an interface which provides an ability for user to 
access state backend when it is allowed (yes, right now this is only allowed by 
ProcessFunction).  This interface itself does not make the things better. 
Instead, it provides a generic interface for the future adoption of exposing 
backend state in all different UDAGG cases, and the current over Aggregate and 
unbounded group aggregate can enjoy the benefits of accessing state backend.

In the meanwhile, I am also curious why we cannot build AggregateFunction on 
RichFunction. We will lose lots of benefit of having state backend for window 
Aggregate if it does not provide runtime context.
@Stephan It is really appreciate if you can share the concerns or blocking 
reasons of not having AggregateFunction designed on top of RichFunction.

Regards,
Shaoxuan


On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske  wrote:

> Hi, thanks for the proposal.
>
> I think exposing state to UDAGGs would be very difficult and a lot of work.
>
> UDAGGs are called from ProcessFunctions (stream, OVER window and 
> non-window aggs), AggregateFunctions (stream, group-window aggs), 
> CombineFunctions
> (batch) and GroupReduceFunctions (batch). The batch functions do not 
> support state backends at all, ProcessFunctions can register state, 
> and AggregateFunction cannot.
> Even when putting the batch case aside this is very hard.
>
> AggregateFunctions support merging of windows. Right now, this only 
> involves merging of accumulators. If we allow AggregateFunctions to 
> have state, we would also need to provide logic to merge the state. 
> Moreover, it is not clearly defined when AggregateFunctions are called 
> (similar to Combiners in MapReduce) which would make state handling very 
> complex.
> Changing this would be a major effort in the DataStream API.
>
> An alternative would be to reimplement the group-window logic in the 
> Table API, but this will he a huge effort as well (maybe we have to do 
> it anyway at some point though).
>
> @Stephan knows more about the implications of allowing state in 
> AggregateFunctions.
>
> Best, Fabian
>
> 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang :
>
> > Hi everyone,
> >
> > We made some progress in the implementation of UDAGG (FLINK-5564).
> However,
> > we realized that there are cases where users may want to use state
> backend
> > to store the data. For instance, the built-in 
> > MaxWithRetractAggFunction currently create a hashMap to store the 
> > historical data. It will have problem when the # of keys are huge enough, 
> > thereby leading to OOM.
> >
> > In FLINK-6544, we have proposed an approach to expose State Backend 
> > Interface for UDAGG. A brief design doc can be found in
> > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > nWscLIOn50c/edit
> >
> > I am opening this discussion thread, as I realized there are 
> > recently
> some
> > open jiras which are towards to implement some special aggregators, 
> > such
> as
> > "count distinct". IMO, "count distinct" is just an UDAGG. With the 
> > new proposed FLINK-6544, we can just make it as a built-in agg 
> > without
> changing
> > the current UDAGG framework.
> >
> > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me 
> > know what you think.
> > Btw, we do not need include this change for release 1.3 in our opinion.
> >
> 

RE: [DISCUSS] Feature Freeze

2017-05-05 Thread Radu Tudoran
Sorry in case I missed something in the discussion of coming up with the list 
of open threads, but there is also 

[FLINK-6075] Support Order By for Stream SQL (Support Limit/Top(Sort) for 
Stream SQL)

...in case you want to consider it (it only needs a review)

Best regards,

-Original Message-
From: Robert Metzger [mailto:rmetz...@apache.org] 
Sent: Friday, May 05, 2017 10:43 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Feature Freeze

I've checked the release status again, and it seems that some more issues made 
it into master, while other very big new features are still being finished 
(incremental checkpointing, retractions, serializer upgrades and automatic 
buffer management).

I'll publish the first, non-voting, testing only RC0 on Monday morning.


Completed Features for 1.3
- TODO after feature freeze

Blockers:
12 blockers (4 documentation, 1 refcatoring) --> 8 real blockers (can be fixed 
after the feature freeze)


Really important (open):
- FLINK-6364: Implement incremental checkpointing in RocksDBStateBackend 
(pending PR)
- FLINK-6047: Add support for Retraction in Table API / SQL (depends on
FLINK-6093)
- FLINK-4545: Flink automatically manages TM network buffer (pending PR, under 
review)
- FLINK-6178: Allow upgrades to state serializers (pending PR, no review )


Mentioned in the thread (open):
- FLINK-6033: Support UNNEST query in the stream SQL API (no PR)
- FLINK-6335: Parse DISTINCT over grouped window in stream SQL (pending PR)
- FLINK-6373: Add runtime support for distinct aggregation over grouped windows 
(pending PR, under review)
- FLINK-6281: Create TableSink for JDBC (pending PR, under review)
- FLINK-6225: Support Row Stream for CassandraSink (pending PR, under
review)
- FLINK-6196: Support dynamic schema in Table Function (pending PR, under
review)
- FLINK-4022: Partition and topic discovery for FlinkKafkaConsumer (pending PR, 
no review)
- FLINK-4821: Implement rescalable non-partitioned state for Kinesis Connector 
(pending PR, under review)

Closed:
- FLINK-6377: Support map types in the Table / SQL API (merged)
- FLINK-6398: RowSerializer's duplicate should always return a new instance
(merged)
- FLINK-5998: Un-fat Hadoop from Flink fat jar (merged)
- FLINK-6337: Remove the buffer provider from PartitionRequestServerHandler
(merged)
- FLINK-5892: Recover job state at the granularity of operator (merged)
- FLINK-5906: Add support to register user defined aggregates in 
TableEnvironment (merged)
- FLINK-6334: Refactoring UDTF interface (merged)



On Wed, May 3, 2017 at 11:42 AM, Aljoscha Krettek 
wrote:

> This is the list of blocking issues for Flink 1.3:
> https://issues.apache.org/jira/browse/FLINK-5740?jql=project
> %20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%
> 20Progress%22%2C%20Reopened)%20AND%20priority%20%3D%
> 20Blocker%20AND%20fixVersion%20%3D%201.3.0 
>  a/browse/FLINK-5740?jql=project%20=%20FLINK%20AND%20status%
> 20in%20(Open,%20%22In%20Progress%22,%20Reopened)%20AND%20priority%20=%
> 20Blocker%20AND%20fixVersion%20=%201.3.0>
>
> Could everyone please update the state of the issues mentioned in this 
> thread to “blocking” and set the version to “1.3.0” if they feel that 
> this is appropriate. Otherwise it is very hard for the release manager 
> to get an overview over the progress of the release. With an 
> up-to-date list it’s also possible to triage issues and discuss on the 
> issue whether it really should be a release blocker.
>
> Best,
> Aljoscha
> > On 2. May 2017, at 22:29, Eron Wright  wrote:
> >
> > Robert, I'd like to see FLINK-5974
> >  (Mesos DNS 
> > support) added to the list of important issues.  A PR is ready.
> >
> > On Tue, May 2, 2017 at 4:30 AM, Kostas Kloudas <
> k.klou...@data-artisans.com>
> > wrote:
> >
> >> The only thing that I want to add in the features to be added for 
> >> 1.3 is the NOT pattern for the CEP library.
> >>
> >> There is an open PR here: https://github.com/apache/flink/pull/3798 
> >> < https://github.com/apache/flink/pull/3798>
> >> which is not far from getting in.
> >>
> >> Kostas
> >>
> >>> On May 2, 2017, at 12:10 PM, Robert Metzger 
> wrote:
> >>>
> >>> Thanks a lot Ufuk for starting the discussion regarding the 1.3 
> >>> feature freeze.
> >>>
> >>> I didn't feature freeze yesterday (Monday) because it was a public
> >> holiday
> >>> here in Germany.
> >>>
> >>> I haven't made up my mind whether to do the feature freeze today 
> >>> or
> not.
> >>> Many important features seem to be close to completion.
> >>> I don't think we can pick features and wait with the release until 
> >>> they
> >> are
> >>> finished. The only thing I can imagine we could do is extend the
> >> deadline a
> >>> little bit to give people more time to complete.
> >>> But I'm strictly against multiple extensions.
> >>>
> >>> To make the discussion a bit easier, 

RE: question about rowtime processfunction - are watermarks needed?

2017-04-28 Thread Radu Tudoran
Hi,
Thanks again Fabian for the explanation. 
Considering what you said - is there anymore a duality with the batch case? As 
the stream cases are non-deterministic I would say the duality in the sense 
that a query on the stream should return the same as the query on the batched 
data does not hold anymore?
I am just trying to get a deeper understanding of this, which I think will 
apply also to the other functions and SQL operators...sorry for bothering you 
with this.

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Friday, April 28, 2017 9:56 AM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

yes that might happen in a parallel setup and depends on the "speed" of the 
parallel threads.
An operator does only increment its own event-time clock to the minimum of the 
last watermark received from each input channel.
If one input channel is "slow", the event-time of an operator lacks behind and 
"late" events of the other threads are correctly processed because the 
operators event-time was not incremented yet.

So, event-time is not deterministic when it comes to which records are dropped.
The watermark documentation might be helpful as well [1].

Cheers,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams

2017-04-27 22:09 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Re-hi,
>
> I debuged a bit the test for the Event rowtime
>
> I tested the testBoundNonPartitionedEventTimeWindowWithRange from 
> SQLITCase class
>
> Although I would expect that once a watermark is triggered: 1) the on 
> timer will be called to process the events that arrived so far and 2) 
> the future events that arrive will be dropped. However, it seems that 
> almost the entire input can arrive in the processElement function 
> before the onTimer is triggered.
>
> Moreover, if you modify the input to add an un-ordered event (see 
> dataset below where I added after watermark 14000 ...an event with 
> watermark 1000...as far as I would expect this should be dropped. 
> However, in different runs it can happen that it will be not dropped. 
> Basically it can happen that the onTimer was never triggered and this 
> event arrives and it is registered). Is this correct? Am I missing something?
>
>
>@Test
>   def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
> val data = Seq(
>   Left((1500L, (1L, 15, "Hello"))),
>   Left((1600L, (1L, 16, "Hello"))),
>   Left((1000L, (1L, 1, "Hello"))),
>   Left((2000L, (2L, 2, "Hello"))),
>   Right(1000L),
>   Left((2000L, (2L, 2, "Hello"))),
>   Left((2000L, (2L, 3, "Hello"))),
>   Left((3000L, (3L, 3, "Hello"))),
>   Right(2000L),
>   Left((4000L, (4L, 4, "Hello"))),
>   Right(3000L),
>   Left((5000L, (5L, 5, "Hello"))),
>   Right(5000L),
>   Left((6000L, (6L, 6, "Hello"))),
>   Left((6500L, (6L, 65, "Hello"))),
>   Right(7000L),
>   Left((9000L, (6L, 9, "Hello"))),
>   Left((9500L, (6L, 18, "Hello"))),
>   Left((9000L, (6L, 9, "Hello"))),
>   Right(1L),
>   Left((1L, (7L, 7, "Hello World"))),
>   Left((11000L, (7L, 17, "Hello World"))),
>   Left((11000L, (7L, 77, "Hello World"))),
>   Right(12000L),
>   Left((14000L, (7L, 18, "Hello World"))),
>   Right(14000L),
>   Left((15000L, (8L, 8, "Hello World"))),
>   Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out
> of ordered and showed be droppped
>   Right(17000L),
>   Left((2L, (20L, 20, "Hello World"))),
>   Right(19000L))
>
>
>
>
> -Original Message-
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, April 27, 2017 3:17 PM
> To: dev@flink.apache.org
> Subject: Re: question about rowtime processfunction - are watermarks 
> needed?
>
> Hi Radu,
>
> event-time processing requires watermarks. Operators use watermarks to 
> compute the current event-time.
> The ProcessFunctions for over range windows use the TimerServices to 
> group elements by time.
> In case of event-time, the timers are triggered by the event-time of 
> the operator which is derived from the received watermarks.
> In case of processing-time, the timers are triggered based on the 
> wallclock time of the operator.
>
> So by using event-tim timers, we implicitly rely on the watermarks 
> because the timers are triggered ba

RE: question about rowtime processfunction - are watermarks needed?

2017-04-27 Thread Radu Tudoran
Re-hi,

I debuged a bit the test for the Event rowtime

I tested the testBoundNonPartitionedEventTimeWindowWithRange from SQLITCase 
class

Although I would expect that once a watermark is triggered: 1) the on timer 
will be called to process the events that arrived so far and 2) the future 
events that arrive will be dropped. However, it seems that almost the entire 
input can arrive in the processElement function before the onTimer is 
triggered. 

Moreover, if you modify the input to add an un-ordered event (see dataset below 
where I added after watermark 14000 ...an event with watermark 1000...as far as 
I would expect this should be dropped. However, in different runs it can happen 
that it will be not dropped. Basically it can happen that the onTimer was never 
triggered and this event arrives and it is registered). Is this correct? Am I 
missing something?


   @Test
  def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
val data = Seq(
  Left((1500L, (1L, 15, "Hello"))),
  Left((1600L, (1L, 16, "Hello"))),
  Left((1000L, (1L, 1, "Hello"))),
  Left((2000L, (2L, 2, "Hello"))),
  Right(1000L),
  Left((2000L, (2L, 2, "Hello"))),
  Left((2000L, (2L, 3, "Hello"))),
  Left((3000L, (3L, 3, "Hello"))),
  Right(2000L),
  Left((4000L, (4L, 4, "Hello"))),
  Right(3000L),
  Left((5000L, (5L, 5, "Hello"))),
  Right(5000L),
  Left((6000L, (6L, 6, "Hello"))),
  Left((6500L, (6L, 65, "Hello"))),
  Right(7000L),
  Left((9000L, (6L, 9, "Hello"))),
  Left((9500L, (6L, 18, "Hello"))),
  Left((9000L, (6L, 9, "Hello"))),
  Right(1L),
  Left((1L, (7L, 7, "Hello World"))),
  Left((11000L, (7L, 17, "Hello World"))),
  Left((11000L, (7L, 77, "Hello World"))),
  Right(12000L),
  Left((14000L, (7L, 18, "Hello World"))),
  Right(14000L),
  Left((15000L, (8L, 8, "Hello World"))),
  Left((1000L, (8L, 8, "Too late - Hello World"))),   ///event is out of 
ordered and showed be droppped
  Right(17000L),
  Left((2L, (20L, 20, "Hello World"))),
  Right(19000L))




-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Thursday, April 27, 2017 3:17 PM
To: dev@flink.apache.org
Subject: Re: question about rowtime processfunction - are watermarks needed?

Hi Radu,

event-time processing requires watermarks. Operators use watermarks to compute 
the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group 
elements by time.
In case of event-time, the timers are triggered by the event-time of the 
operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock 
time of the operator.

So by using event-tim timers, we implicitly rely on the watermarks because the 
timers are triggered based on the received watermarks.

Best, Fabian


2017-04-27 10:51 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I am looking at the implementation of  RowTimeBoundedRangeOver (in the 
> context of Stream SQL). I see that the logic is that the progress 
> happens based on the timestamps of the rowevent - i.e., when an even 
> arrives we register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we 
> do not consider watermarks nor some allowed latency for the events or 
> anything like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And 
> keep the events within the allowed delayed/next watermark?  Am I 
> missing something? Or maybe we do not consider at this point 
> allowedLateness  for this version?
>
> Thanks
>
> Best regards,
>
>


question about rowtime processfunction - are watermarks needed?

2017-04-27 Thread Radu Tudoran
Hi,

I am looking at the implementation of  RowTimeBoundedRangeOver (in the context 
of Stream SQL). I see that the logic is that the progress happens based on the 
timestamps of the rowevent - i.e., when an even arrives we register to be 
processed based on it's timestamp 
(ctx.timerService.registerEventTimeTimer(triggeringTs))

In the onTimer we remove (retract) data that has expired. However, we do not 
consider watermarks nor some allowed latency for the events or anything like 
this, which makes me ask:
Don't we need to work with watermarks when we deal with even time? And keep the 
events within the allowed delayed/next watermark?  Am I missing something? Or 
maybe we do not consider at this point allowedLateness  for this version?

Thanks

Best regards,



RE: Sliding Window - Weird behaviour

2017-04-13 Thread Radu Tudoran
Hi,

You need to implement your own timer. You do this when you create your window 
by assigning the timer. In your custom timer you would need to implement the 
desired logic in the onElement method.
You can keep a counter that you increment for each element up to your desired 
number of elements and FIRE only when this value is reaches your threshold 
after which you want to trigger

You can take a look in existing triggers
https://github.com/apache/flink/tree/1875cac03042dad4a4c47b0de8364f02fbe457c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers
 


-Original Message-
From: madhairsilence [mailto:harish.kum...@tcs.com] 
Sent: Thursday, April 13, 2017 12:25 PM
To: dev@flink.apache.org
Subject: Re: Sliding Window - Weird behaviour

Hi Xingcan

Thanks for the answer. But up to my understanding

countWindow(4,2) - Should wait for 4 elements (or window not more than 4
element) and once the window is ready, slide two items

Now if I have to stopped asking why questions and worry about my current 
problem, how do I achieve this expected output.

Stream : 1,2,3,4,5,6,7,8...

Output:
1,2
2,3
3,4
4,5...



--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sliding-Window-Weird-behaviour-tp17013p17019.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.


RE: Question about the process order in stream aggregate

2017-04-11 Thread Radu Tudoran
Hi Xingcan,

If you need to guarantee the order also in the case of procTime a trick that 
you can do is to set the working time of the env to processing time and to 
assign the proctime to the incoming stream. You can do this via 
.assignTimestampsAndWatermarks(new ...)
And override 
override def extractTimestamp(
  element: type...,
  previousElementTimestamp: Long): Long = {
  System.currentTimeMillis()
}

Alternatively you can play around with the stream source and control the time 
when the events come

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: fhue...@gmail.com [mailto:fhue...@gmail.com] 
Sent: Tuesday, April 11, 2017 2:24 PM
To: Stefano Bortoli; dev@flink.apache.org
Subject: AW: Question about the process order in stream aggregate

Resending to dev@f.a.o

Hi Xingcan,

This is expected behavior. In general, is not possible to guarantee results for 
processing time.

Your query is translated as follows:

CollectionSrc(1) -round-robin-> MapFunc(n) -hash-part-> ProcessFunc(n) -fwd-> 
MapFunc(n) -fwd-> Sink(n)

The order of records is changed because of the connection between source and 
first map function. Here, records are distributed round robin to increase the 
parallelism from 1 to n. The parallel instances of map might forward the 
records in different order to the ProcessFunction that computes the 
aggregation. 

Hope this helps,
Fabian


Von: Stefano Bortoli
Gesendet: Dienstag, 11. April 2017 14:10
An: dev@flink.apache.org
Betreff: RE: Question about the process order in stream aggregate

Hi Xingcan,

Are you using parallelism 1 for the test?  procTime semantics deals with the 
objects as they loaded in the operators. It could be the co-occuring 
partitioned events (in the same MS time frame) are processed in parallel and 
then the output is produced in different order.

I suggest you to have a look at the integration test to verify that the 
configuration of your experiment is correct.

Best,
Stefano

-Original Message-
From: Xingcan Cui [mailto:xingc...@gmail.com] 
Sent: Tuesday, April 11, 2017 5:31 AM
To: dev@flink.apache.org
Subject: Question about the process order in stream aggregate

Hi all,

I run some tests for stream aggregation on rows. The data stream is simply 
registered as

val orderA: DataStream[Order] = env.fromCollection(Seq(
  Order(1L, "beer", 1),
  Order(2L, "diaper", 2),
  Order(3L, "diaper", 3),
  Order(4L, "rubber", 4)))
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount),

and the SQL is defined as

select product, sum(amount) over (partition by product order by procTime() rows 
between unbounded preceding and current row from orderA).

My expected output should be

2> Result(beer,1)
2> Result(diaper,2)
1> Result(rubber,4)
2> Result(diaper,5).

However, sometimes I get the following output

2> Result(beer,1)
2> Result(diaper,3)
1> Result(rubber,4)
2> Result(diaper,5).

It seems that the row "Order(2L, "diaper", 2)" and "Order(3L, "diaper", 3)"
are out of order. Is that normal?

BTW, when I run `orderA.keyBy(2).map{x => x.amount + 1}.print()`, the order for 
them can always be preserved.

Thanks,
Xingcan



RE: FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Radu Tudoran
Hi,

I am not sure if it is not about setting the timestamp within the query but you 
can imagine that there examples where you have different timestamps as 
mentioned. Take for example the case when we do a purchase online. You have:
-time of purchase  (when the payment was input/triggered)
-time of executing the transaction at bank (when the payment is processed from 
account)
-time of settlement (when the payment is executed at merchant bank - when money 
are received by the seller)

In such a scenario you can imagine that over the same stream of online payments 
you might want to run different queries, each that might be driven by one of 
these times. Supporting such a scenario would mean that we have one input 
stream that enters flink engine via a table source and then in the query we can 
start running different queries:
e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10   //you want 
the amount over your last 10 orders
e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you want 
the amount over your last 10 income

Best regards,


-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Monday, March 20, 2017 2:05 PM
To: dev@flink.apache.org
Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and processing 
time

Yes, you are right. In the current design the user cannot assign 
timestamp and watermarks in a table program. Operators (such as windows) 
might adapt the metatimestamp, if this is the case this adaption might 
need to be expressed in the query itself too.

E.g. for a tumbling windows we could limit the select part to 
table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime 
matches the physical metatimestamp)

Do you have a good example use case that needs the assignment of rowtime 
within a query?

Am 20/03/17 um 13:39 schrieb Radu Tudoran:
> Hi,
>
> As suggested by Timo - I am forwarding this to the mailing list. Sorry for 
> not having the conversation directly here - I initially thought it might not 
> be of interest...
>
> @Timo - thanks for the clarification. I get the main point now which is that 
> the rowtime is encoded within the  metadata of the record. I think this is 
> key. My view on the matter was maybe a bit updated in the sense that I saw 
> the processing pipeline as an input source (as you exemplify - a table scan) 
> and from there you have a timestamp and water mark assigner before the 
> processing actually starts. So by overriding the timestamp extractor you 
> match the field that carries the eventtime/rowtime with the mechanism from 
> flink. But as far as I understand this would not be the case anymore...am I 
> right? In case the assignment of the rowtime to the metadata of the record is 
> done differently - what would be the way to do it?
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
>
>
> -Original Message-
> From: Timo Walther [mailto:twal...@apache.org]
> Sent: Monday, March 20, 2017 12:29 PM
> To: Radu Tudoran
> Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing 
> time
>
> You are not bothering me, it is very interesting to compare the design
> with real world use cases.
>
> In your use case we would create table like: tEnv.toTable('date, 'time1,
> 'time2, 'data, 'myrowtime.rowtime)
>
> We would not "overwrite" an actual attribute of the record but only add
> logical "myrowtime". In general, just to make it clear again, the
> rowtime must be in the metatimestamp of the record (by using a timestamp
> extractor before). The Table API assumes that records that enter the
> Table API are timestamped correctly. So in your use case, you would
> create your own TableSourc

FW: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Radu Tudoran
Hi,

As suggested by Timo - I am forwarding this to the mailing list. Sorry for not 
having the conversation directly here - I initially thought it might not be of 
interest...

@Timo - thanks for the clarification. I get the main point now which is that 
the rowtime is encoded within the  metadata of the record. I think this is key. 
My view on the matter was maybe a bit updated in the sense that I saw the 
processing pipeline as an input source (as you exemplify - a table scan) and 
from there you have a timestamp and water mark assigner before the processing 
actually starts. So by overriding the timestamp extractor you match the field 
that carries the eventtime/rowtime with the mechanism from flink. But as far as 
I understand this would not be the case anymore...am I right? In case the 
assignment of the rowtime to the metadata of the record is done differently - 
what would be the way to do it?


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Monday, March 20, 2017 12:29 PM
To: Radu Tudoran
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time

You are not bothering me, it is very interesting to compare the design 
with real world use cases.

In your use case we would create table like: tEnv.toTable('date, 'time1, 
'time2, 'data, 'myrowtime.rowtime)

We would not "overwrite" an actual attribute of the record but only add 
logical "myrowtime". In general, just to make it clear again, the 
rowtime must be in the metatimestamp of the record (by using a timestamp 
extractor before). The Table API assumes that records that enter the 
Table API are timestamped correctly. So in your use case, you would 
create your own TableSource extract the timestamp based on your 3 time 
fields and define an attribute that represents the rowtime logically. In 
the current design we want that the Table API relies on Flink's time 
handling, because time handling can be very tricky.So we only support 
one event-time time field.

But would it be possible to post our discussion on the ML? It might be 
interesting for others as well. If yes, can you forward our conversion 
to the ML?

Timo



Am 20/03/17 um 12:11 schrieb Radu Tudoran:
> Thanks for the replies.
>
> Regarding the ""It might be sometimes that this is not explicit to be 
> guessed" That is
> why I added the RelTimeConverter. After this conversion step it should
> be as explicit as possible (by using the special types). And we can add
> special handling of functions (i.e. ceil) that preserve the monotonicity."
>
> ..maybe I am missing something so sorry if I just bother you for nothing (it 
> is just to make sure we think of all cases before hand). I saw examples of 
> applications where you have multiple fields of the same type. For example an 
> event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 of TIME (this is 
> actually from a real application with some sort fo standard communication 
> schema). I was referring to such cases that it is unclear to me how the code 
> will identify the exact field to use as rowtime for example. This is what I 
> meant about how are we passing indicators to spot the row time field as well 
> as what would happen with the code in such a situation as it can identify 
> multiple time fields.
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG

RE: [DISCUSS] Table API / SQL indicators for event and processing time

2017-03-20 Thread Radu Tudoran
;>>>>>> adapt the watermarks which is not trivial.
>>>>>>>> Instead, I think we should initially only allow very few
>> monotone
>>>>>>>> transformations which are aligned with the existing watermarks.
>> We
>>>>>>>> might later relax this condition if we see that users request
>> this
>>>>>> feature.
>>>>>>>> You are right, that we need to track which attribute can be 
>>>>>>>> used
>>> as
>>>> a
>>>>>>>> time attribute (i.e., is increasing and guarded by watermarks).
>>>>>>>> For that we need to expose the time attribute when a Table is
>>>> created
>>>>>>>> (either when a DataStream is converted like:
>> stream.toTable(tEnv,
>>>> 'a,
>>>>>>>> 'b,
>>>>>>>> 't.rowtime) or in a StreamTableSource) and track how it is used
>> in
>>>>>>>> queries.
>>>>>>>> I am not sure if the monotone property would be the right 
>>>>>>>> choice here, since data is only quasi-monotone and a monotone
>> annotation
>>>>>>>> might trigger some invalid optimizations which change the
>>> semantics
>>>> of
>>>>>> a query.
>>>>>>>> Right now, Calcite does not offer a quasi-monotone property (at
>>>> least
>>>>>>>> I haven't found it).
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>>
>>>>>>>> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>> As I said in another thread, the main difference between 
>>>>>>>>> stream
>>> and
>>>>>>>>> table is that a stream is an ordered list while a table is an
>>>>>> unordered set.
>>>>>>>>> Without considering the out-of-order problem in practice,
>> whether
>>>>>>>>> event-time or processing-time can be just taken as a
>>> monotonically
>>>>>>>>> increasing field and that's why the given query[1] would work.
>> In
>>>>>>>>> other words, we must guarantee the "SELECT MAX(t22.rowtime)
>> ..."
>>>>>>>>> subquery returns a single value that can be retrieved from the 
>>>>>>>>> cached dynamic table since it's dangerous to join two
>> un-windowed
>>>>>>>>> streams.
>>>>>>>>>
>>>>>>>>> Under this circumstance, I just consider adding a "monotonic 
>>>>>>>>> hint"(INC or
>>>>>>>>> DEC) to the field of a (generalized) table (maybe using an 
>>>>>>>>> annotation on the registerDataXX method) that can be used to 
>>>>>>>>> indicate whether a field is monotonically increasing or
>>> decreasing.
>>>>>>>>> Then by taking rowtime as common (monotonically increasing)
>>> field,
>>>>>>>>> there are several benefits:
>>>>>>>>>
>>>>>>>>> 1) This can uniform the table and stream by importing total
>>>> ordering
>>>>>>>>> relation to an unordered set.
>>>>>>>>>
>>>>>>>>> 2) These fields can be modified arbitrarily as long as they
>> keep
>>>> the
>>>>>>>>> declared monotonic feature and the watermark problem does not
>>> exist
>>>>>>>>> any more.
>>>>>>>>>
>>>>>>>>> 3) The monotonic hint will be useful in the query optimization
>>>>> process.
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Xingcan
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> SELECT​ ​t1.amount​,​ ​t2.rate FROM​ ​
>>>>>>>>> table1 ​AS​ t1,
>>>>>>>>> ​ ​ table2 ​AS​ ​t2
>>>>>>>>> WHERE ​
>>>>>>>>> t1.cu

Extended SQL support for Streaming

2017-03-16 Thread Radu Tudoran
Hi,

I have created several JIRA issues with proposal implementation to support more 
SQL functions for streaming. As there is an ongoing discussion about supporting 
retraction for the Stream SQL, these features focus on the processing time. 
Extending them for event time would be done (in most of the cases similar) 
after the new retraction mechanism are merged. The JIRA issues are:
1.
2. FLINK-6082<https://issues.apache.org/jira/browse/FLINK-6082> - Support 
window definition for SQL Queries based on WHERE clause with time condition
3. FLINK-6081<https://issues.apache.org/jira/browse/FLINK-6081> - 
Offset/Fetch support for SQL Streaming
4. FLINK-6077<https://issues.apache.org/jira/browse/FLINK-6077> - Support 
In/Exists/Except/Any /Some/All for Stream SQL
5. FLINK-6075<https://issues.apache.org/jira/browse/FLINK-6075> - Support 
Limit/Top(Sort) for Stream SQL
6. FLINK-6073<https://issues.apache.org/jira/browse/FLINK-6073> - Support 
for SQL inner queries for proctime
7.
Please let me know what you think and of course any feedback about the 
design/priority for the roadmap/implementation is more than welcomed.

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Scala / Java window issue

2017-03-10 Thread Radu Tudoran
Hi,

I am struggling to move a working implementation from Java to Scala :(...this 
is for computing window aggregates (sliding window).
As I am not proficient in Scala I got block in (probably a stupid 
error)...maybe someone can help me.


I am trying to create a simple window function to be applied to the datastream 
after the window is created (I have one case with global windows and another 
case with keyed windows, so the question applies on both AllWindowFunction as 
well as to WindowFunction). However I get a typemistamtch error when applying 
the function to the window.

As I need to implement the function in scala... I tried 2 options, which both 
fail:
Option 1: implement MyWindowFunction by extending the WindowFunction from the 
scala package (org.apache.flink.streaming.api.scala.function)
..in this case when I apply the function to the window it tells me that the 
there is a typemistmatched
Option 2: implement MyWindowFunction by extending the Windowfunction from the 
default package (org.apache.flink.streaming.api.functions.windowing)
..in this case when I try to override the apply function I get a compilation 
error that the class needs to be abstract as it does not implement the apply 
function :(

...any solution?





TRAVIS error?

2017-03-06 Thread Radu Tudoran
Hi,

I was looking over the travis test& deploy status (because of a fail that was 
unrelated with my contribution in the pull request). I see a lot of failures 
also for the other pull requests.
I see that there are typical for 2 build jobs that fail across pull requests:  
one related to the fact that kafka has some errors and one to the fact that 
some maven error happnes
Trying to KILL watchdog (1345).
./tools/travis_mvn_watchdog.sh: line 210: 1345 Terminated watchdog
MVN exited with EXIT CODE: 1.
java.io.FileNotFoundException: build-target/lib/flink-dist-.jar (No such file 
or directory)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.(ZipFile.java:220)
at java.util.zip.ZipFile.(ZipFile.java:150)
at java.util.zip.ZipFile.(ZipFile.java:121)
at sun.tools.jar.Main.list(Main.java:1060)
at sun.tools.jar.Main.run(Main.java:291)
at sun.tools.jar.Main.main(Main.java:1233)

· <https://travis-ci.org/apache/flink/jobs/208151041>
14570.4 <https://travis-ci.org/apache/flink/jobs/208151041>
JDK: oraclejdk8 <https://travis-ci.org/apache/flink/jobs/208151041>
PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis 
-Dmaven.javadoc.skip=true" <https://travis-ci.org/apache/flink/jobs/208151041>
21 min 30 sec <https://travis-ci.org/apache/flink/jobs/208151041>

· <https://travis-ci.org/apache/flink/jobs/208151042>
14570.5 <https://travis-ci.org/apache/flink/jobs/208151042>
JDK: oraclejdk8 <https://travis-ci.org/apache/flink/jobs/208151042>
PROFILE="-Dhadoop.version=2.6.3 
-Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis 
-Dmaven.javadoc.skip=true" <https://travis-ci.org/apache/flink/jobs/208151042>



Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



RE: [DISCUSS] Table API / SQL indicators for event and processing time

2017-02-15 Thread Radu Tudoran
Hi,

My initial thought would be that it makes more sense to thave procTime() and 
rowTime() only as functions which in fact are to be used as markers. Having the 
value (even from special system attributes does not make sense in some scenario 
such as the ones for creating windows, e.g.,
If you have SELECT Count(*) OVER (ORDER BY procTime()...) 
If you get the value of procTime you cannot do anything as you need the marker 
to know how to construct the window logic.

However, your final idea of having " implement some rule/logic that translates 
the attributes to special RexNodes internally " I believe is good and gives a 
solution to both problems. One the one hand for those scenarios where you need 
the value you can access the value, while for others you can see the special 
type of the RexNode and use it as a marker. 

Regarding keeping this data in a table...i am not sure as you would say we  
need to augment the data with two fields whether needed or not...this is nto 
necessary very efficient


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Wednesday, February 15, 2017 9:33 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time

Hi all,

at first I also thought that built-in functions (rowtime() and
proctime()) are the easiest solution. However, I think to be future-proof we 
should make them system attributes; esp. to relate them to a corresponding 
table in case of multiple tables. Logically they are attributes of each row, 
which is already done in Table API.

I will ask on the Calcite ML if there is a good way for integrating system 
attributes. Right now, I would propose the following implementation:

- we introduce a custom row type (extending RelDataType)
- in a streaming environment every row has two attributes by default (rowtime 
and proctime)
- we do not allow creating a row type with those attributes (this should 
already prevent `SELECT field AS rowtime FROM ...`)
- we need to ensure that these attributes are not part of expansion like 
`SELECT * FROM ...`
- implement some rule/logic that translates the attributes to special RexNodes 
internally, such that the opimizer does not modify these attributes

What do you think?

Regards,
Timo




Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> Hi all,
>
> thanks for this thread.
>
> @Fabian If I didn't miss the point, the main difference between the 
> two approaches is whether or not taking these time attributes as 
> common table fields that are directly available to users. Whatever, 
> these time attributes should be attached to records (right?), and the 
> discussion lies in whether give them public qualifiers like other 
> common fields or private qualifiers and related get/set methods.
>
> The former (system attributes) approach will be more compatible with 
> existing SQL read-only operations (e.g., select, join), but we need to 
> add restrictions on SQL modification operation (like what?). I think 
> there are no needs to forbid users modifying these attributes via 
> table APIs (like map function). Just inform them about these special 
> attribute names like system built in aggregator names in iteration.
>
> As for the built in function approach, I don't know if, for now, there 
> are functions applied on a single row (maybe the value access 
> functions like COMPOSITE.get(STRING)?). It seems that most of the 
> built in functions work for a single field or on columns and thus it 
> will be mountains of work if we want to add a new kind of function to 
> SQL. Maybe all existing operations should be modified to support it.
>
> All in all, if there are existing supports for single row function, I 
> prefer the built in function approach. Otherwise the system attributes 
> approach should be better

RE: [DISCUSS] Table API / SQL indicators for event and processing time

2017-02-13 Thread Radu Tudoran
Hi Fabian,

Thanks for starting the discussion. Before I give my thoughts on this can you 
please give some examples of how would you see option of using "system 
attributes"?
Do you use this when you register the stream as a table, do you use if when you 
call an SQL query, do you use it when you translate back a table to a stream / 
write it to a dynamic table?

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Tuesday, February 14, 2017 1:01 AM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL indicators for event and processing time

Hi,

I'd like to start an discussion about how Table API / SQL queries indicate 
whether an operation is done in event or processing time.

1) Why do we need to indicate the time mode?

We need to distinguish event time and processing time mode for operations in 
queries in order to have the semantics of a query fully defined.
This cannot be globally done in the TableEnvironment because some queries 
explicitly request an expression such as the ORDER BY clause of an OVER window 
with PRECEDING / FOLLOWING clauses.
So we need a way to specify something like the following query:

SELECT
  a,
  SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND 
CURRENT ROW) AS sumB, FROM myStream

where "proctime" indicates processing time. Equivalently "rowtime" would 
indicate event time.

2) Current state

The current master branch implements time support only for grouping windows in 
the Table API.
Internally, the Table API converts a 'rowtime symbol (which looks like a 
regular attribute) into a special expression which indicates event-time.
For example:

table
  .window(Tumble over 5.milli on 'rowtime as 'w)
  .groupBy('a, 'w)
  .select(...)

defines a tumbling event-time window.

Processing-time is indicated by omitting a time attribute (table.window(Tumble 
over 5.milli as 'w) ).

3) How can we do that in SQL?

In SQL we cannot add special expressions without touching the parser which we 
don't want to do because we want to stick to the SQL standard.
Therefore, I see only two options: adding system attributes or
(parameterless) built-in functions. I list some pros and cons of the approaches 
below:

1. System Attributes:
+ most natural way to access a property of a record.
+ works with joins, because time attributes can be related to tables
- We need to ensure the attributes are not writable and always present in 
streaming tables (i.e., they should be system defined attributes).
- Need to adapt existing Table API expressions (will not change the API but 
some parts of the internal translation)
- Event time value must be set when the stream is converted, processing time is 
evaluated on the fly

2. Built-in Functions
+ Users could try to modify time attributes which is not possible with
functions
- do not work with joins, because we need to address different relations
- not a natural way to access a property of a record

I think the only viable choice are system attributes, because built-in 
functions cannot be used for joins.
However, system attributes are the more complex solution because they need a 
better integration with Calcite's SQL validator (preventing user attributes 
which are named rowtime for instance).

Since there are currently a several contributions on the way (such as SQL OVER 
windows FLINK-5653 to FLINK-5658) that need time indicators, we need a solution 
soon to be able to make progress.
There are two PRs, #3252 and #3271, which implement the built-in marker 
functions proctime() and rowtime() and which could serve as a temporary 
solution (since we do not work on joins yet).
I would like to suggest to use these functions as a starting point (once the 
PRs are merged) and later change to the system attribute solution which needs a 
bit more time to be implemented.

I talked with Timo today about this i

RE: Stream SQL and Dynamic tables

2017-02-07 Thread Radu Tudoran
Hi,

I made some comments over the Dynamic table document. Not sure how to ask for 
feedback for them...therefore my email.

Please let me know what do you think

https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#heading=h.3eo2vkvydld6

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, January 30, 2017 9:07 PM
To: dev@flink.apache.org
Subject: Re: Stream SQL and Dynamic tables

Hi Radu,

yes, the clean-up timeout would need to be defined somewhere.
I would actually prefer to do that within the query, because the clean-up 
timeout affects the result and hence the semantics of the computed result.
This could look for instance as

SELECT a, sum(b)
FROM myTable
WHERE rowtime BETWEEN now() - INTERVAL '1' DAY AND now() GROUP BY a;

In this query now() would always refer to the current time, i.e., the current 
wall-clock time for processing time or the current watermark time for event 
time.
The result of the query would be the grouped aggregate of the data received in 
the last hour.
We can add syntactic sugar with built-in functions as for example:
last(rowtime, INTERVAL '1' DAY).

In addition we can also add a configuration parameter to the TableEnvironment 
to control the clean-up timeout.

Cheers,
Fabian

2017-01-30 18:14 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi Fabian,
>
> Thanks for the clarifications. I have a follow up question: you say 
> that operations are expected to be bounded in space and time (e.g., 
> the optimizer will do a cleanup after a certain timeout period). - can 
> I assume that this will imply that we will have at the level of the 
> system a couple of parameters that hold these thresholds and potentially can 
> be setup?
>
> For example having in the environment variable
>
> Env.setCleanupTimeout(100,TimeUnit.Minutes);
>
> ...or alternatively perhaps directly at the level of the table (either 
> table environment or the table itself)
>
> TableEnvironment tbEnv =...
> tbEnv.setCleanupTimeOut(100,TimeUnit.Minutes)
> Table tb=
> tb.setCleanupTimeOut(100,TimeUnit.Minutes)
>
>
>
> -Original Message-
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Friday, January 27, 2017 9:41 PM
> To: dev@flink.apache.org
> Subject: Re: Stream SQL and Dynamic tables
>
> Hi Radu,
>
> the idea is to only support operations that are bounded in space and 
> compute time:
>
> - space: the size of state may not infinitely grow over time or with 
> growing key domains. For these cases, the optimizer will enforce a 
> cleanup timeout and all data which is passed that timeout will be discarded.
> Operations which cannot be bounded in space will be rejected.
>
> - compute time: certain queries can not be efficiently execute because 
> newly arriving data (late data or just newly appended rows) might 
> trigger recomputation of large parts of the current state. Operations 
> that will result in such a computation pattern will be rejected. One 
> example would be event-time OVER ROWS windows as we discussed in the other 
> thread.
>
> So the plan is that the optimizer takes care of limiting the space 
> requirements and computation effort.
> However, you are of course right. Retraction and long running windows 
> can result significant amounts of operator state.
> I don't think this is a special requirement for the Table API (there 
> are users of the DataStream API with jobs that manage TBs of state). 
> Persisting state to disk with RocksDB and scaling out to more nodes 
> should address the scaling problem initially. In the long run, the 
> Flink community will work to improve the handling of large state with 
> features such as incremental checkpoints and new state backends.
>
&g

RE: [jira] [Created] (FLINK-5722) Implement DISTINCT as dedicated operator

2017-02-06 Thread Radu Tudoran
Hi,

Should we discuss also about the design of distinct for the stream case?
It might go well in the context of tables as well as in the context of 
aggregates over windows...

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Fabian Hueske (JIRA) [mailto:j...@apache.org] 
Sent: Monday, February 06, 2017 2:56 PM
To: dev@flink.apache.org
Subject: [jira] [Created] (FLINK-5722) Implement DISTINCT as dedicated operator

Fabian Hueske created FLINK-5722:


 Summary: Implement DISTINCT as dedicated operator
 Key: FLINK-5722
 URL: https://issues.apache.org/jira/browse/FLINK-5722
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.2.0, 1.3.0
Reporter: Fabian Hueske


DISTINCT is currently implemented for batch Table API / SQL as an aggregate 
which groups on all fields. Grouped aggregates are implemented as GroupReduce 
with sort-based combiner.

This operator can be more efficiently implemented by using ReduceFunction and 
hinting a HashCombine strategy. The same ReduceFunction can be used for all 
DISTINCT operations and can be assigned with appropriate forward field 
annotations.

We would need a custom conversion rule which translates distinct aggregations 
(grouping on all fields and returning all fields) into a custom DataSetRelNode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


RE: STREAM SQL inner queries

2017-02-02 Thread Radu Tudoran
Hi Fabian,

Thanks for this perspective. I agree with you.
I will wait a couple of more days to see if others will provide additional 
feedback. After this I will start working to define the jira issue that we can 
refine. Also as you mentioned, I believe there will be several iterations and 
refinements over the implementation (potentially to start from very simple and 
naïve functioning and then extend it to provide more elegant semantics).



Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Friday, February 03, 2017 12:15 AM
To: dev@flink.apache.org
Subject: Re: STREAM SQL inner queries

Hi Radu,

I think mostly processing time queries will fall into the category of queries 
that will never require to update a previously emitted result.
Event-time queries have to prepare for late arriving data (which is not 
possible with processing time) and use cases that require event-time usually do 
not want to drop late data.

The query semantics that you intended for your first example in this thread 
should be implementable without updates for processing time.
However, the query would need to be differently specified, similar as the 
following one which was adapted from the Temporal Table document [1].

SELECT​ ​i2.amount​,​ ​i1.id
FROM​ ​
  InputStream2​ ​AS​ i2,
​ ​​ ​InputStream1 ​AS​ ​i1
WHERE ​procTime(i1)​ ​=​ ​(
​ ​​ ​SELECT​ ​MAX(procTime(i1_2))
​ ​​ ​FROM​ ​InputStream1 ​AS​ ​i1_2
​ ​​ ​AND​ ​procTime(i1_2)​ ​<=​ procTime(​i2))

Note that the query is not a simple left-join, i.e., the number for returned 
rows is larger than InputStream2 if more more than one rows in
InputStream1 have the same procTime() (might be OK, if InputStream1 is only 
slowly changing).

Also note, that we have to think more about the procTime() function which is 
intended to just be a marker for processing time.
However, in case of join queries we need to indicate which table we refer to. 
So this detail needs to be fleshed out.

I think in principle, we could start to work on such joins, but given that the 
condition is not trivial and involves a correlated subquery it might take more 
time than initially expected to implement the optimization and translation of 
such joins.

Best, Fabian

[1]
https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqFMBtzYiIY4dHe0Q

2017-01-31 22:47 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I understand the logic and indeed considering the " batch and stream 
> query equality " it makes the version you have proposed (with the 
> materialized view for inputstream2.
>
> You also mentioned there might be some queries that will never require 
> to update previously emitted results such as queries that discard late 
> arriving data and do not compute early results. For these we can and 
> should apply runtime optimizations and simply append to the previously 
> emitted records.
>
> Should we thus try to identify now such queries and provide the inner 
> query implementation for these? Should we wait and see how we deal 
> with materilized view first?
> Basically I am asking what should we do now?
>
> BTW - thanks for the patience for the discussion and for brainstorming 
> on this!
>
> -Original Message-
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Tuesday, January 31, 2017 3:45 PM
> To: dev@flink.apache.org
> Subject: Re: STREAM SQL inner queries
>
> Hi,
>
> If the goal is that the materialized result of a streaming query 
> should be equivalent to the result of a batch query on the 
> materialized input, we need to update previously emitted data.
> Only appending to the already emitted results will not work in most of 
> the cases.
>
> In case of the join query you proposed
>
> SELECT STREAM amount,
> (SELECT id FROM  inputstream1 ORD

RE: STREAM SQL inner queries

2017-01-31 Thread Radu Tudoran
Hi,

I was thinking about this reply...
I am not sure I understand exactly the idea why would you need to keep the 
whole state for Option 2. From my point of view this is not needed (and I see 
this as the easy case). The main reason is that you have the SINGLE_VALUE 
operator which would imply that you do not keep the whole state but rather 
update a single value. This is of course valid only for the operators that do 
not require to re-apply a full aggregation. For example it would work for MIN, 
MAX, Value Selection (e.g. last value)...but would not work on SUM, Average or 
Count...unless is an unbound aggregation where you only update with the new 
values. Basically we could fail/throw an exception just like for the dynamic 
tables in case there are not enough resources to enable to compute the query

Nevertheless, I see the discussion goes binding this with the concept of the 
dynamic tables. In this case I would suggest that the distinction between the 
two option to be done based on the existence of an ID in the stream. This is 
the idea that makes the differentiation between append tables and update 
tables. We could use the same here.

If the inner stream on which we apply the inner query has an ID, then option 1 
(recomputed and apply updates based on retraction and all others); otherwise 
then option 2 (only make an update in the operator that keeps the single value 
- if this is possible)

Best regards,

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, January 30, 2017 8:39 PM
To: dev@flink.apache.org
Subject: Re: STREAM SQL inner queries

Hi Radu,

Updates of the result (materialized view) are not always simple appends. If the 
query is a non-windowed aggregation or a windowed aggregation (or join) with 
late data, some parts of the result need to be removed or updated.
I think in order to implement the second option, we would need to emit the 
complete result for every update because we do not know which parts of the 
previous view became invalid. This is not practical, because it would mean to 
hold the complete result as state and to the complete result for every update.

In contrast, the first option sends retraction and update records to update the 
latest view.
Moreover, we only need to hold those results as state that might be updated and 
not the complete result.

I agree that the discussion helps a lot.

Best, Fabian

2017-01-30 15:49 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I would like to ask for a further clarifications about the statement:
> " a streaming query should be equivalent to the result of a batch 
> query that is executed on the materialized stream "
>
> I do agree with the principle but the question that we I would like to 
> ask is how do we interpret the relation between a stream and the 
> materialized view of the stream at some point. If we consider that we 
> materialize the view on the elements we received on the stream until 
> moment X (let's say it had elements 1 2 3) and we apply an SQL query, 
> indeed this should give the exact same result as if 1 2 3 would be in 
> a database/batch and we apply the same logic. However, some time later 
> in the future, if we receive another element (e.g. 4) do we have the 
> same materialized view, which we update or we consider a new state, a 
> new materialized view and therefore a new scenario. Basically assuming 
> we take last value. Then we can have two
> options:
>
> Option 1)  At moment x the output is 3 (last  value of the 
> materialized view of 1 2 3 is 3) and then at moment X+1 when 4 
> arrives, the last value remains unchanged 1 (it is the same 
> materialize view) Option 2) At moment x the output is 3 (last  value 
> of the materialized view of 1 2 3 is 3) and then at moment X+1 when 4 
> arrives, the f

RE: Stream SQL and Dynamic tables

2017-01-31 Thread Radu Tudoran
Hi,

For someone who likes to program more stream-like, I must say I like the syntax 
that you proposed. So I would be fine to keep it this way.

My only question/concerned is if someone who does SQL as a day to day job would 
like this way to write queries in which we port at least time concepts from 
streaming. However, it is not a complex concept to be ported - so I would like 
to believe that it is not a big deal to write SQL queries using this syntax. 
Nevertheless, I just wanted to raise the point.



-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, January 30, 2017 9:07 PM
To: dev@flink.apache.org
Subject: Re: Stream SQL and Dynamic tables

Hi Radu,

yes, the clean-up timeout would need to be defined somewhere.
I would actually prefer to do that within the query, because the clean-up 
timeout affects the result and hence the semantics of the computed result.
This could look for instance as

SELECT a, sum(b)
FROM myTable
WHERE rowtime BETWEEN now() - INTERVAL '1' DAY AND now() GROUP BY a;

In this query now() would always refer to the current time, i.e., the current 
wall-clock time for processing time or the current watermark time for event 
time.
The result of the query would be the grouped aggregate of the data received in 
the last hour.
We can add syntactic sugar with built-in functions as for example:
last(rowtime, INTERVAL '1' DAY).

In addition we can also add a configuration parameter to the TableEnvironment 
to control the clean-up timeout.

Cheers,
Fabian

2017-01-30 18:14 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi Fabian,
>
> Thanks for the clarifications. I have a follow up question: you say 
> that operations are expected to be bounded in space and time (e.g., 
> the optimizer will do a cleanup after a certain timeout period). - can 
> I assume that this will imply that we will have at the level of the 
> system a couple of parameters that hold these thresholds and potentially can 
> be setup?
>
> For example having in the environment variable
>
> Env.setCleanupTimeout(100,TimeUnit.Minutes);
>
> ...or alternatively perhaps directly at the level of the table (either 
> table environment or the table itself)
>
> TableEnvironment tbEnv =...
> tbEnv.setCleanupTimeOut(100,TimeUnit.Minutes)
> Table tb=
> tb.setCleanupTimeOut(100,TimeUnit.Minutes)
>
>
>
> -Original Message-
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Friday, January 27, 2017 9:41 PM
> To: dev@flink.apache.org
> Subject: Re: Stream SQL and Dynamic tables
>
> Hi Radu,
>
> the idea is to only support operations that are bounded in space and 
> compute time:
>
> - space: the size of state may not infinitely grow over time or with 
> growing key domains. For these cases, the optimizer will enforce a 
> cleanup timeout and all data which is passed that timeout will be discarded.
> Operations which cannot be bounded in space will be rejected.
>
> - compute time: certain queries can not be efficiently execute because 
> newly arriving data (late data or just newly appended rows) might 
> trigger recomputation of large parts of the current state. Operations 
> that will result in such a computation pattern will be rejected. One 
> example would be event-time OVER ROWS windows as we discussed in the other 
> thread.
>
> So the plan is that the optimizer takes care of limiting the space 
> requirements and computation effort.
> However, you are of course right. Retraction and long running windows 
> can result significant amounts of operator state.
> I don't think this is a special requirement for the Table API (there 
> are users of the DataStream API with jobs that manage TBs of state). 
> Persisting state to disk with RocksDB and scaling out to more nodes 
> should address the scaling problem initially. In the long run, the 
> Flink community will work to improve the handling of large state with 
> features such as incremental checkpoints and new state backends.
>
> Looking forward to your comments.
>
> Best,
> Fabian
>
> 2017-01-27 11:01 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
> > Hi,
> >
> > Thanks for the clarification Fabian - it is really useful.
> > I agree that we should consolidate the module and avoid the need to 
> > further maintain 3 different "projects". It does make sense to see 
> > the current (I would call it)"Stream SQL" as a table with append semantics.
> > However, one thing that should be clarified is what is the best way 
> > from the implementation point of view to keep the state of the table 
> > (if we can actually keep it - though the need is clear for 
> > supporting retraction). As the input is a stream and the table is 
> > append of c

RE: Stream SQL and Dynamic tables

2017-01-30 Thread Radu Tudoran
Hi Fabian,

Thanks for the clarifications. I have a follow up question: you say that 
operations are expected to be bounded in space and time (e.g., the optimizer 
will do a cleanup after a certain timeout period). - can I assume that this 
will imply that we will have at the level of the system a couple of parameters 
that hold these thresholds and potentially can be setup?

For example having in the environment variable

Env.setCleanupTimeout(100,TimeUnit.Minutes);

...or alternatively perhaps directly at the level of the table (either table 
environment or the table itself)

TableEnvironment tbEnv =...
tbEnv.setCleanupTimeOut(100,TimeUnit.Minutes)
Table tb=
tb.setCleanupTimeOut(100,TimeUnit.Minutes)



-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Friday, January 27, 2017 9:41 PM
To: dev@flink.apache.org
Subject: Re: Stream SQL and Dynamic tables

Hi Radu,

the idea is to only support operations that are bounded in space and compute 
time:

- space: the size of state may not infinitely grow over time or with growing 
key domains. For these cases, the optimizer will enforce a cleanup timeout and 
all data which is passed that timeout will be discarded.
Operations which cannot be bounded in space will be rejected.

- compute time: certain queries can not be efficiently execute because newly 
arriving data (late data or just newly appended rows) might trigger 
recomputation of large parts of the current state. Operations that will result 
in such a computation pattern will be rejected. One example would be event-time 
OVER ROWS windows as we discussed in the other thread.

So the plan is that the optimizer takes care of limiting the space requirements 
and computation effort.
However, you are of course right. Retraction and long running windows can 
result significant amounts of operator state.
I don't think this is a special requirement for the Table API (there are users 
of the DataStream API with jobs that manage TBs of state). Persisting state to 
disk with RocksDB and scaling out to more nodes should address the scaling 
problem initially. In the long run, the Flink community will work to improve 
the handling of large state with features such as incremental checkpoints and 
new state backends.

Looking forward to your comments.

Best,
Fabian

2017-01-27 11:01 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> Thanks for the clarification Fabian - it is really useful.
> I agree that we should consolidate the module and avoid the need to 
> further maintain 3 different "projects". It does make sense to see the 
> current (I would call it)"Stream SQL" as a table with append semantics.
> However, one thing that should be clarified is what is the best way 
> from the implementation point of view to keep the state of the table 
> (if we can actually keep it - though the need is clear for supporting 
> retraction). As the input is a stream and the table is append of 
> course we run in the classical infinite issue that streams have. What should 
> be the approach?
> Should we consider keeping the data in something like the statebackend 
> now for windows, and then pushing them to the disk (e.g., like 
> FSStateBackends). Perhaps with the disk we can at least enlarge the 
> horizon of what we keep.
> I will give some comments and some thoughts in the document about this.
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -Original Message-
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, January 26, 2017 3:37 PM
> To: dev@flink.apache.org
> Subject: Re: Stream SQL and Dynamic tables
>
> Hi Radu,
>
> the idea is to have dynamic tables as the common ground for Table API and
> SQL.
> I

RE: STREAM SQL inner queries

2017-01-30 Thread Radu Tudoran
Hi,

I would like to ask for a further clarifications about the statement:
" a streaming query should be equivalent to the result of a batch query that is 
executed on the materialized stream "

I do agree with the principle but the question that we I would like to ask is 
how do we interpret the relation between a stream and the materialized view of 
the stream at some point. If we consider that we materialize the view on the 
elements we received on the stream until moment X (let's say it had elements 1 
2 3) and we apply an SQL query, indeed this should give the exact same result 
as if 1 2 3 would be in a database/batch and we apply the same logic. However, 
some time later in the future, if we receive another element (e.g. 4) do we 
have the same materialized view, which we update or we consider a new state, a 
new materialized view and therefore a new scenario. Basically assuming we take 
last value. Then we can have two options:

Option 1)  At moment x the output is 3 (last  value of the materialized view of 
1 2 3 is 3) and then at moment X+1 when 4 arrives, the last value remains 
unchanged 1 (it is the same materialize view)
Option 2) At moment x the output is 3 (last  value of the materialized view of 
1 2 3 is 3) and then at moment X+1 when 4 arrives, the first value is modified 
to 4 (it is a new materialized view and the output is as we would apply the SQL 
query on the batch case with all elements 1 2 3 4)

I would assume (based on previous discussions and the panel in flink forward) 
that we rather go for option 2. The correct output of a SQL query on a stream 
is that one would create a materialized view at that point in time and apply 
the query in batch mode. When a new element arrives (stream evolves) then we 
will get a new materialized view.

If this is the case as my assumption that I would say that SINGLE_ VALUE should 
be continuously updated as the stream on top of which is applied evolves. 

My 2cents (anyway - I think the discussion is very useful and hopefully 
applicable also for other operators/scenarios that we are going to implement)




-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, January 30, 2017 2:09 PM
To: dev@flink.apache.org
Subject: Re: STREAM SQL inner queries

Hi Radu,

I think it is most important to get the semantics of a streaming query right.
In my opinion, the result of a streaming query should be equivalent to the 
result of a batch query that is executed on the materialized stream.
It should not matter whether you append the records received from a Kafka topic 
to a table and execute a batch query on that table or if you do run the same 
query continuously on the Kafka topic.

It is correct, that some queries become too expensive to compute if we 
implement these semantics.
However, this would be the price to pay for stream-batch consistent semantics.

Regarding the inner query case. I think a query should yield the same result, 
regardless of whether it is an inner or outer query.
This is one of the core principles of SQL, that I would not change.

Best, Fabian

2017-01-30 12:54 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi Fabian,
>
> Thanks for the link and for the remarks.
>
> I do not imagine the behavior of the inner query necessary on the 
> lines you describe. I specifically refer to " is applies as well for 
> the inner query. However, as the result of the inner query evolves, 
> also the result of the join needs to be constantly recomputed. Hence, 
> for every new result of (SELECT x FROM input1 ORDER BY time LIMIT 1), 
> we would need to emit an update for each record that was joined before."
>
> If we consider such a working scenario, then the behavior would be 
> something like the one below, if I understand correctly. Take for 
> example the query" STREAM amount, (SELECT id FROM  inputstream1) AS 
> field1 FROM inputstream2"
>
> Stream1 Stream2 Output
> Id1
> User1,10(10,Id1)
> User2,11(11,Id2)
> Id3 (10,Id3), (11, Id3)
> User3,9 (9,Id3)
> ...
>
> ...regardless of how we express the logic of the inner query (do we 
> use LIMIT 1, we don't), I would expect that the outputs that were 
> emitted are not retracted or modified in the future. In the previous 
> example the
> updates:  (10,Id3), (11, Id3) should never happen. With this in mind 
> although the inner query is translated to a LogicalJoin operator, the 
> functionality is more similar with a union or a coFlatMap, where we 
> only use one input as the holder for what to associate in the future 
> for the other. Anyway, I do not see the need to have any buffers (as 
> for the general case of joins) to compute the content for creating the 
> output from the inner quer

RE: STREAM SQL inner queries

2017-01-30 Thread Radu Tudoran
Hi Fabian,

Thanks for the link and for the remarks. 

I do not imagine the behavior of the inner query necessary on the lines you 
describe. I specifically refer to " is applies as well for the inner query. 
However, as the result of the inner query evolves, also the result of the join 
needs to be constantly recomputed. Hence, for every new result of (SELECT x 
FROM input1 ORDER BY time LIMIT 1), we would need to emit an update for each 
record that was joined before."

If we consider such a working scenario, then the behavior would be something 
like the one below, if I understand correctly. Take for example the query" 
STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM inputstream2"

Stream1 Stream2 Output
Id1
User1,10(10,Id1)
User2,11(11,Id2)
Id3 (10,Id3), (11, Id3)
User3,9 (9,Id3) 
...

...regardless of how we express the logic of the inner query (do we use LIMIT 
1, we don't), I would expect that the outputs that were emitted are not 
retracted or modified in the future. In the previous example the updates:  
(10,Id3), (11, Id3) should never happen. With this in mind although the inner 
query is translated to a LogicalJoin operator, the functionality is more 
similar with a union or a coFlatMap, where we only use one input as the holder 
for what to associate in the future for the other. Anyway, I do not see the 
need to have any buffers (as for the general case of joins) to compute the 
content for creating the output from the inner query.

Regarding your previous comment about failing based on SINGLE_VALUE 
verification: this is also something to be just agree. After all, as the 
implementation is decoupled from the parsing of the query, we can implement any 
of the behaviors: either through an error when a second element or update would 
happen in the second stream, or just update the single value state for future 
use. 

All in all, it think we just need to clarify the expectation to have. Please 
let me know what do you think.

I agree with the approach of starting small - even with some very limited cases 
when we support inner queries and then extend or define the general cases. 


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, January 30, 2017 12:33 PM
To: dev@flink.apache.org
Subject: Re: STREAM SQL inner queries

Hi Radu,

I thought about your join proposal again and think there is an issue with the 
semantics.

The problem is that the result of a query is recomputed as new data arrives in 
the dynamic table.
This applies as well for the inner query. However, as the result of the inner 
query evolves, also the result of the join needs to be constantly recomputed. 
Hence, for every new result of (SELECT x FROM input1 ORDER BY time LIMIT 1), we 
would need to emit an update for each record that was joined before.

In order to prevent this, the join would need to have a time-based join 
predicate defining that a tuple of the outer query should join with the current 
value of the inner query that the time of its own timestamp. Such a predicate 
can be expressed in SQL but this is quite cumbersome.

Julian Hyde (Apache Calcite committer) discussed similar use cases in a 
document and proposed something called Temporal Tables [1].
In some sense, the proposed dynamic tables are a special case of temporal table 
always reflecting the current point in time (i.e., not a previous point in 
time).

Best, Fabian

[1]
https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqFMBtzYiIY4dHe0Q

2017-01-27 21:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Radu,
>
> I was a bit surprised, that Calcite's parser accepted your query.
> Hence, I check the Calcite plan and and had look at the documentation 
&g

RE: Stream SQL and Dynamic tables

2017-01-27 Thread Radu Tudoran
Hi,

Thanks for the clarification Fabian - it is really useful. 
I agree that we should consolidate the module and avoid the need to further 
maintain 3 different "projects". It does make sense to see the current (I would 
call it)"Stream SQL" as a table with append semantics. However, one thing that 
should be clarified is what is the best way from the implementation point of 
view to keep the state of the table (if we can actually keep it - though the 
need is clear for supporting retraction). As the input is a stream and the 
table is append of course we run in the classical infinite issue that streams 
have. What should be the approach? 
Should we consider keeping the data in something like the statebackend now for 
windows, and then pushing them to the disk (e.g., like FSStateBackends). 
Perhaps with the disk we can at least enlarge the horizon of what we keep.
I will give some comments and some thoughts in the document about this.  


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Thursday, January 26, 2017 3:37 PM
To: dev@flink.apache.org
Subject: Re: Stream SQL and Dynamic tables

Hi Radu,

the idea is to have dynamic tables as the common ground for Table API and SQL.
I don't think it is a good idea to implement and maintain 3 different 
relational APIs with possibly varying semantics.

Actually, you can see the current status of the Table API / SQL on stream as a 
subset of the proposed semantics.
Right now, all streams are implicitly converted into Tables with APPEND 
semantics. The currently supported operations (selection, filter, union, group 
windows) return streams.
The only thing that would change for these operations would be the output mode 
to be retraction mode by default in order to be able to emit updated records 
(e.g., updated aggregates due to late records).

The document is not final and we can of course discuss the proposal.

Best, Fabian

2017-01-26 11:33 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi all,
>
>
>
> I have a question with respect to the scope behind the initiative 
> behind relational queries on data streams:
>
> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_
> f4konQPW4tnl8THw6rzGUdaqU/edit#
>
>
>
> Is the approach of using dynamic tables intended to replace the 
> implementation and mechanisms build now in stream sql ? Or will the 
> two co-exist, be built one on top of the other?
>
>
>
> Also – is the document in the final form or can we still provide 
> feedback / ask questions?
>
>
>
> Thanks for the clarification (and sorry if I missed at some point the 
> discussion that might have clarified this)
>
>
>
> Dr. Radu Tudoran
>
> Senior Research Engineer - Big Data Expert
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>*
>
> Mobile: +49 15209084330 <+49%201520%209084330>
>
> Telephone: +49 891588344173 <+49%2089%201588344173>
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered 
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing 
> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: 
> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from 
> HUAWEI, which is intended only for the person or entity whose address 
> is listed above. Any use of the information contained herein in any 
> way (including, but not limited to, total or partial disclosure, 
> reproduction, or dissemination) by persons other than the intended 
> recipient(s) is prohibited. If you receive this e-mail in error, 
> please notify the sender by phone or email immediately and delete it!
>
>
>


RE: STREAM SQL inner queries

2017-01-27 Thread Radu Tudoran
Hi all,

Thanks for the feedback!

I agree that we should get the semantics right and after we can implement it. I 
think it would be quite useful. Now, regarding the remarks you made:


"> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM 
inputstream2
you are suggesting that the subquery (SELECT id FROM  inputstream1) returns a 
single value (I assume the last received value). I would interpret the query 
differently and expect it to return the values of all rows of the inputstream1 
up to the current point in time.
"

It is a good point. It is not so much that I wanted to suggest that this should 
be the syntax to use - I just relied basically on the logical operators that 
calcite has parsed the query into (JOIN + SINGLE VALUE). Based on this logical 
translation I would say the correct implementation for this translation is to 
return one value not necessarily the whole content of the stream. Anyway, we 
are not restricted to this as we could potentially use different rules in 
calcite to alter the resulting plan. 

However, if we decide that such queries should return the whole stream rather 
than a single value - we are indeed tapping in the problem of potentially 
unbounded cases. For this I do agree that the approach you proposed to rely on 
dynamic tables is very good. In such a case we would just pass to the upper 
operators the entire content of the dynamic table. For that matter it works 
also for the single value (as the table would contain only one value). However, 
for the simple case of returning a single value we can provide even now an 
implementation and we do not need to wait until the full functionality of 
dynamic tables is provided.

In the same time I also agree that the syntax  " a FROM inputstream ORDER BY 
time LIMIT 1" is elegant. I have not issue to consider the case of inner 
queries to be translated like this only when they would have the "Limit 1" 
specified or directly only when they are provided in such a form.

I will wait for additional remarks in order to all agree on a specific semantic 
and then I will push this in a jira issue to be furthered review and validated.

Best regards,



Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Thursday, January 26, 2017 4:51 PM
To: dev@flink.apache.org
Subject: Re: STREAM SQL inner queries

Hi everybody,

thanks for the proposal Radu.
If I understood it correctly, you are proposing a left join between a stream 
and a single value (which is compute from a stream).
This makes sense and should be a common use case.

However, I think some of your example queries do not return a single value as 
required for the join.

In your example:
> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1 FROM
inputstream2

you are suggesting that the subquery (SELECT id FROM  inputstream1) returns a 
single value (I assume the last received value).
I would interpret the query differently and expect it to return the values of 
all rows of the inputstream1 up to the current point in time.
IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT 1" would 
capture the semantics better.

The subquery
> (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL 1 HOUR
PRECEDING) AS hour_sum FROM inputstream)

has a similar problem and would return one row for each record of inputstream, 
i.e., not a single value.

Anyway, if we get the semantics of the query that computes the single value 
right, I think this type of join should be well covered by the dynamic table 
proposal.
The single value input will be a dynamic table (of constant size = 1) which is 
continuously updated by the engine.
Joining this table to to a dynamic (append) table will result in a continuously 
growing dynamic table, which can be emitted as a stream.

This would look very similar as you proposed but 

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

2017-01-26 Thread Radu Tudoran
s accumulator, we have 
> >>>> also introduced
> >>>>   a retract API for UDAGG. With aggregate accumulator and retract
> >> API, I
> >>>> am
> >>>>   proposing a runtime approach to implement the OVER window as
> >>> followings.
> >>>>   4.
> >>>>  - We first implement a sorted state interface
> >>>>  - Per each group, we just create one sorted state. When a 
> >>>> new
> >>> record
> >>>>  arrives, it will insert into this sorted state, in the 
> >>>> meanwhile
> >> it
> >>>> will be
> >>>>  accumulated to the aggregate accumulator.
> >>>>  - For over window, we keep the aggregate accumulator for the
> >> entire
> >>>>  job lifelong time. This is different than the case where we
> >> delete
> >>>> the
> >>>>  accumulator for each group/window when a grouping-window is
> >>> finished.
> >>>>  - When an over window is up to trigger, we grab the
> >>>>  previous accumulator from the state and accumulate values 
> >>>> onto it with all
> >>>>  the records till the upperBoundary of the current window, 
> >>>> and retract all
> >>>>  the out of scope records till its lowerBoundary. We emit the
> >>>>  aggregate result and save the accumulator for the next window.
> >>>>
> >>>>
> >>>> Hello Fabian,
> >>>> I would suggest we should first start working on runtime design 
> >>>> of
> over
> >>>> window and aggregate. Once we have a good design there, one can 
> >>>> easily
> >>> add
> >>>> the support for SQL as well as tableAPI. What do you think?
> >>>>
> >>>> Regards,
> >>>> Shaoxuan
> >>>>
> >>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske 
> >>>> <fhue...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Radu,
> >>>>>
> >>>>> thanks for your comments!
> >>>>>
> >>>>> Yes, my intention is to open new JIRA issues to structure the 
> >>>>> development process. Everybody is very welcome to pick up issues 
> >>>>> and discuss the design proposals.
> >>>>> At the moment I see the following six issues to start with:
> >>>>>
> >>>>> - streaming SQL OVER ROW for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for event time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> For each of these windows we need corresponding translation 
> >>>>> rules and execution code.
> >>>>>
> >>>>> Subsequent JIRAs would be
> >>>>> - extending the Table API for supported SQL windows
> >>>>> - add support for FOLLOWING
> >>>>> - etc.
> >>>>>
> >>>>> Regarding the requirement for a sorted state. I am not sure if 
> >>>>> the OVER windows should be implemented using Flink's DataStream 
> >>>>> window
> >>>> framework.
> >>>>> We need a good design document to figure out what is the best 
> >>>>> approach. A ProcessFunction with a sorted state might be a good
> >>> solution
> >>>> as well.
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>>
> >>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Thanks for starting these discussion - it is very useful.
> >>>>>> It does make sense indeed to refactor all these and coordinate 
> >>>>>> a
> >> bit
> >>>>>> the efforts not to have overlapping implementations and
> >> incompatible
> >>>>> solutions.
> >&

RE: [jira] [Created] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-01-26 Thread Radu Tudoran
Hi,

Can you assign please the jira issue to me. I already can start integrating the 
implementation that for this.


-Original Message-
From: Fabian Hueske (JIRA) [mailto:j...@apache.org] 
Sent: Thursday, January 26, 2017 2:44 PM
To: dev@flink.apache.org
Subject: [jira] [Created] (FLINK-5654) Add processing time OVER RANGE BETWEEN x 
PRECEDING aggregation to SQL

Fabian Hueske created FLINK-5654:


 Summary: Add processing time OVER RANGE BETWEEN x PRECEDING 
aggregation to SQL
 Key: FLINK-5654
 URL: https://issues.apache.org/jira/browse/FLINK-5654
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Fabian Hueske


The goal of this issue is to add support for OVER RANGE aggregations on 
processing time streams to the SQL interface.

Queries similar to the following should be supported:
{code}
SELECT
  a,
  SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
HOUR PRECEDING AND CURRENT ROW) AS sumB,
  MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
HOUR PRECEDING AND CURRENT ROW) AS minB FROM myStream {code}

The following restrictions should initially apply:
- All OVER clauses in the same SELECT clause must be exactly the same.
- The PARTITION BY clause is optional (no partitioning results in single 
threaded execution).
- The ORDER BY clause may only have procTime() as parameter. procTime() is a 
parameterless scalar function that just indicates processing time mode.
- UNBOUNDED PRECEDING is not supported
- FOLLOWING is not supported.

The restrictions will be resolved in follow up issues. If we find that some of 
the restrictions are trivial to address, we can add the functionality in this 
issue as well.

This issue includes:
- Design of the DataStream operator to compute OVER ROW aggregates
- Translation from Calcite's RelNode representation (LogicalProject with 
RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Stream SQL and Dynamic tables

2017-01-26 Thread Radu Tudoran
Hi all,

I have a question with respect to the scope behind the initiative behind 
relational queries on data streams:
https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#

Is the approach of using dynamic tables intended to replace the implementation 
and mechanisms build now in stream sql ? Or will the two co-exist, be built one 
on top of the other?

Also - is the document in the final form or can we still provide feedback / ask 
questions?

Thanks for the clarification (and sorry if I missed at some point the 
discussion that might have clarified this)

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



RE: STREAM SQL inner queries

2017-01-26 Thread Radu Tudoran
on the future Relational Queries 
on flink streaming. Fabian has drafted a very good design doc, 
https://goo.gl/m31kkE. The design is based on a new concept of dynamic table 
whose content changes over time, thereby can be derived from streams. With this 
dynamic table, stream query can be done via regular
(batch) SQL. Besides some syntax sugar, there is not too much difference 
between batch query and stream query (in terms of what and where of a query is 
executed). Stream query has addition characters in the manners of when to emit 
a result and how to refine the result considering the retraction.

Hope this helps and look forward to working with you on streaming SQL.

Regards,
Shaoxuan


On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi all,
>
> I would like to open a jira issue (and then provide the 
> implementation) for supporting inner queries. The idea is to be able 
> to support SQL queries as the ones presented in the scenarios below. 
> The key idea is that supporting inner queries would require to have the 
> implementation for:
>
> è JOIN (type = left and condition = true) - Basically this is a simple 
> implementation for a join function between 2 streams that does not 
> require any window support behind the scenes as there is no condition 
> on which to perform the join
>
> è SINGLE_VALUE - this operator would require to provide one value to 
> be furthered joined. In the context of streaming this value should 
> basically evolve with the contents of the window. This could be 
> implemented with a flatmap function as left joins would allow also to 
> do the mapping with null values
>
> We can then extend this initial and simple implementation to provide 
> support for joins in general (conditional joins, right joins..) or we 
> can isolate this implementation for this specific case of inner 
> queries and go with a totally new design for stream to stream joins 
> (might be needed depending on what is the decision behind on how to 
> support the conditional
> mapping)
>
> What do you think about this?
>
> Examples of scenarios to apply
>
> SELECT STREAM amount,
> (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
> LogicalJoin(condition=[true], joinType=[left])
>   LogicalTableScan(table=[[inputstream1]])
>   LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> LogicalProject(user_id=[$0])
>   LogicalTableScan(table=[[inputstream2]])
>
> Or from the same stream - perhaps interesting for applying some more 
> complex operations within the inner query SELECT STREAM amount, 
> (SELECT id FROM  inputstream1) AS field1 FROM inputstream1
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
> LogicalJoin(condition=[true], joinType=[left])
>   LogicalTableScan(table=[[inputstream1]])
>   LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> LogicalProject(user_id=[$0])
>   LogicalTableScan(table=[[inputstream1]])
>
> Or used to do the projection
> SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
>
> Translated to
>   LogicalProject(amount=[$1], c=[$5])
> LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
>   LogicalTableScan(table=[[inputstream1]])
>
>
> Or in the future even
> SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER 
> window AS myagg FROM inputstream1)) ...
>
>
>
>


STREAM SQL inner queries

2017-01-25 Thread Radu Tudoran
Hi all,

I would like to open a jira issue (and then provide the implementation) for 
supporting inner queries. The idea is to be able to support SQL queries as the 
ones presented in the scenarios below. The key idea is that supporting inner 
queries would require to have the implementation for:

è JOIN (type = left and condition = true) - Basically this is a simple 
implementation for a join function between 2 streams that does not require any 
window support behind the scenes as there is no condition on which to perform 
the join

è SINGLE_VALUE - this operator would require to provide one value to be 
furthered joined. In the context of streaming this value should basically 
evolve with the contents of the window. This could be implemented with a 
flatmap function as left joins would allow also to do the mapping with null 
values

We can then extend this initial and simple implementation to provide support 
for joins in general (conditional joins, right joins..) or we can isolate this 
implementation for this specific case of inner queries and go with a totally 
new design for stream to stream joins (might be needed depending on what is the 
decision behind on how to support the conditional mapping)

What do you think about this?

Examples of scenarios to apply

SELECT STREAM amount,
(SELECT id FROM  inputstream1) AS field1
FROM inputstream2

Translated to
LogicalProject(amount=[$1], c=[$4])
LogicalJoin(condition=[true], joinType=[left])
  LogicalTableScan(table=[[inputstream1]])
  LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
LogicalProject(user_id=[$0])
  LogicalTableScan(table=[[inputstream2]])

Or from the same stream - perhaps interesting for applying some more complex 
operations within the inner query
SELECT STREAM amount,
(SELECT id FROM  inputstream1) AS field1
FROM inputstream1

Translated to
LogicalProject(amount=[$1], c=[$4])
LogicalJoin(condition=[true], joinType=[left])
  LogicalTableScan(table=[[inputstream1]])
  LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
LogicalProject(user_id=[$0])
  LogicalTableScan(table=[[inputstream1]])

Or used to do the projection
SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)

Translated to
  LogicalProject(amount=[$1], c=[$5])
LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
  LogicalTableScan(table=[[inputstream1]])


Or in the future even
SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER window AS 
myagg FROM inputstream1))
...





RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

2017-01-25 Thread Radu Tudoran
ort for SQL as well as tableAPI. What do you think?
> >
> > Regards,
> > Shaoxuan
> >
> > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > Hi Radu,
> > >
> > > thanks for your comments!
> > >
> > > Yes, my intention is to open new JIRA issues to structure the 
> > > development process. Everybody is very welcome to pick up issues 
> > > and discuss the design proposals.
> > > At the moment I see the following six issues to start with:
> > >
> > > - streaming SQL OVER ROW for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for event time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > For each of these windows we need corresponding translation rules 
> > > and execution code.
> > >
> > > Subsequent JIRAs would be
> > > - extending the Table API for supported SQL windows
> > > - add support for FOLLOWING
> > > - etc.
> > >
> > > Regarding the requirement for a sorted state. I am not sure if the 
> > > OVER windows should be implemented using Flink's DataStream window
> > framework.
> > > We need a good design document to figure out what is the best 
> > > approach. A ProcessFunction with a sorted state might be a good
> solution
> > as well.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for starting these discussion - it is very useful.
> > > > It does make sense indeed to refactor all these and coordinate a 
> > > > bit the efforts not to have overlapping implementations and 
> > > > incompatible
> > > solutions.
> > > >
> > > > If you close the 3 jira issues you mentioned - do you plan to 
> > > > redesign them and open new ones? Do you need help from our side 
> > > > - we can also pick the redesign of some of these new jira 
> > > > issues. For example we already
> > > have
> > > > an implementation for this and we can help with the design.
> > > > Nevertheless, let's coordinate the effort.
> > > >
> > > > Regarding the support for the different types of window - I 
> > > > think the
> > > best
> > > > option is to split the implementation in small units. We can 
> > > > easily do
> > > this
> > > > from the transformation rule class and with this each particular 
> > > > type of window (session/sliding/sliderows/processing time/...) 
> > > > will have a clear implementation and a corresponding 
> > > > architecture within
> > the jira issue?
> > > What
> > > > do you think about such a granularity?
> > > >
> > > > Regarding the issue of " Q4: The implementaion of SlideRows 
> > > > still need a custom operator that collects records in a priority 
> > > > queue ordered by the "rowtime", which is similar to the design 
> > > > we discussed in FLINK-4697, right? "
> > > > Why would you need this operator? The window buffer can act to 
> > > > some
> > > extent
> > > > as a priority queue as long as the trigger and evictor is set to 
> > > > work
> > > based
> > > > on the rowtime - or maybe I am missing something... Can you 
> > > > please
> > > clarify
> > > > this.
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Senior Research Engineer - Big Data Expert IT R Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center 
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: radu.tudo...@huawei.com
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 
> > > > Düsseldorf, Germany, www.huawei.com Registered Office: 
> > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing 
> > > > Director: Bo PENG, Wanzhou MENG

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

2017-01-24 Thread Radu Tudoran
Hi Fabian,

Thanks for the feedback and for clarifications with respect to the new JIRA 
issues.
How should we proceed from here? Will you start creating this or can we also 
take the design of some of these issues?

I am particular interested in - streaming SQL OVER RANGE for processing time
if you want to share the workload :)


Regarding the topic of implementing OVER windows based on Flink's DataStream 
window framework or using things like ProcessFunction I have a slight different 
opinion: I believe that windows are more suitable to support this 
implementation. This is because:
-stream windows are semantically richer and more evolved then the Stream SQL 
windows, so they can cover in principle all cases
-if it is to extend the ProcessFunction to provide this implementation we would 
have 2 cases: 
1) in which we have aggregates over the streams <<-eg. SELECT STREAM 
COUNT (*) FROM streaminput>> for which indeed we can easily rely on process 
function
2) in which the aggregate has bounds << COUNT(*) OVER streaninput 
(RANGE INTERVAL '10' MINUTE PRECEDING) >> where the aggregate needs to be 
updated based on contents of the "window". This means that if we would try to 
implement this behavior with ProcessFunction we end up adding the window buffer 
to it - which becomes an actual window operator. 
->I believe we could potentially just define a special type of window for this 
case and still rely on the rich semantics that exists in window. We can define 
this SlidingRowWindow or RollingWindow..whatever the name would be - which is a 
specialized window for this. We can perhaps enhance the window with a sorting 
function. Let me know what do you think about this?


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Tuesday, January 24, 2017 3:43 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for 
streaming tables

Hi Radu,

thanks for your comments!

Yes, my intention is to open new JIRA issues to structure the development 
process. Everybody is very welcome to pick up issues and discuss the design 
proposals.
At the moment I see the following six issues to start with:

- streaming SQL OVER ROW for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for event time
  - bounded PRECEDING
  - unbounded PRECEDING

For each of these windows we need corresponding translation rules and execution 
code.

Subsequent JIRAs would be
- extending the Table API for supported SQL windows
- add support for FOLLOWING
- etc.

Regarding the requirement for a sorted state. I am not sure if the OVER windows 
should be implemented using Flink's DataStream window framework.
We need a good design document to figure out what is the best approach. A 
ProcessFunction with a sorted state might be a good solution as well.

Best, Fabian


2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi all,
>
> Thanks for starting these discussion - it is very useful.
> It does make sense indeed to refactor all these and coordinate a bit 
> the efforts not to have overlapping implementations and incompatible 
> solutions.
>
> If you close the 3 jira issues you mentioned - do you plan to redesign 
> them and open new ones? Do you need help from our side - we can also 
> pick the redesign of some of these new jira issues. For example we 
> already have an implementation for this and we can help with the 
> design. Nevertheless, let's coordinate the effort.
>
> Regarding the support for the different types of window - I think the 
> best option is to split the implementation in small units. We can 
> easily do this from the transformation rule class an

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

2017-01-24 Thread Radu Tudoran
Hi all,

Thanks for starting these discussion - it is very useful.
It does make sense indeed to refactor all these and coordinate a bit the 
efforts not to have overlapping implementations and incompatible solutions.

If you close the 3 jira issues you mentioned - do you plan to redesign them and 
open new ones? Do you need help from our side - we can also pick the redesign 
of some of these new jira issues. For example we already have an implementation 
for this and we can help with the design. Nevertheless, let's coordinate the 
effort.

Regarding the support for the different types of window - I think the best 
option is to split the implementation in small units. We can easily do this 
from the transformation rule class and with this each particular type of window 
(session/sliding/sliderows/processing time/...) will have a clear 
implementation and a corresponding architecture within the jira issue? What do 
you think about such a granularity?

Regarding the issue of " Q4: The implementaion of SlideRows still need a custom 
operator that collects records in a priority queue ordered by the "rowtime", 
which is similar to the design we discussed in FLINK-4697, right? "
Why would you need this operator? The window buffer can act to some extent as a 
priority queue as long as the trigger and evictor is set to work based on the 
rowtime - or maybe I am missing something... Can you please clarify this.


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Jark Wu [mailto:wuchong...@alibaba-inc.com] 
Sent: Tuesday, January 24, 2017 6:53 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for 
streaming tables

Hi Fabian, 

Thanks for bringing up this discussion and the nice approach to avoid 
overlapping contributions.

All of these make sense to me. But I have some questions.

Q1: If I understand correctly, we will not support TumbleRows and SessionRows 
at the beginning. But maybe support them as a syntax sugar (in Table API) when 
the SlideRows is supported in the future. Right ? 

Q2: How to support SessionRows based on SlideRows ?  I don't get how to 
partition on "gap-separated".

Q3: Should we break down the approach into smaller tasks for streaming tables 
and batch tables ? 

Q4: The implementaion of SlideRows still need a custom operator that collects 
records in a priority queue ordered by the "rowtime", which is similar to the 
design we discussed in FLINK-4697, right? 

+1 not support for OVER ROW for event time at this point.

Regards, Jark


> 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道:
> 
> Hi,
> We are also interested in streaming sql and very willing to participate and 
> contribute.
> 
> We are now in progress and we will also contribute to calcite to push forward 
> the window and stream-join support.
> 
> 
> 
> --
> Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time: 2017年1月24日 
> 5:55
> Receiver: dev@flink.apache.org
> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows 
> for streaming tables
> 
> Hi Haohui,
> 
> our plan was in fact to piggy-back on Calcite and use the TUMBLE function [1] 
> once is it is available (CALCITE-1345 [2]).
> Unfortunately, this issue does not seem to be very active, so I don't know 
> what the progress is.
> 
> I would suggest to move the discussion about group windows to a separate 
> thread and keep this one focused on the organization of the SQL OVER windows.
> 
> Best,
> Fabian
> 
> [1] http://calcite.apache.org/docs/stream.html)
> [2] https://issues.apache.org/jira/browse/CALCITE-1345
> 
> 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>:
> 
>> Hi Fabian,
>> 
>> FLINK-4692 has added the support for tumbling window and we are 
&g

WindowRunner?

2016-11-04 Thread Radu Tudoran
Hi,

I am working on creating an implementation for SQL Stream windows. I wanted to 
ask for your opinion if you think that to do so it is better to have a 
WindowRunner just like we have for the case of FlatMapRunner and FlatJoinRunner 
or you think it could be potentially implemented over the existing ones...?

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



RE: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-10-07 Thread Radu Tudoran
mp
>> > > > information that might have been previously set on the original
>> > > > StreamRecord - is there any other way to recreate StreamRecord?
>> > > >
>> > > >
>> > > >
>> > > > 2. Regarding ProcessingTimeEvictor -
>> > > >
>> > > > A TimeEvictor has to evict elements from the window which are older
>> > than
>> > > a
>> > > > given Period from the element with maximum timestamp in the window.
>> > When
>> > > > considering ProcessingTimestamp(even if it was explicitly set),
>> > shouldn't
>> > > > the timestamp associated with records be strictly increasing. i.e.,
>> > newer
>> > > > elements should have higher timestamp than earlier elements. So to
>> get
>> > > the
>> > > > max timestamp we could just get the last element. When using
>> > > > EventTimeEvictor, the elements might have arrived out of order
>> hence we
>> > > > can't just take the timestamp of the last element as maximum
>> timestamp,
>> > > but
>> > > > check each and every element in the window.
>> > > >
>> > > > We should have two versions of TimeEvictors - EventTime and
>> > > ProcessingTime,
>> > > > but does ProcessingTimeEvictor need to take a Tupel2<Long,T> since
>> > > anyways
>> > > > we are going to get the max timestamp by looking at the last
>> element in
>> > > the
>> > > > window?.
>> > > >
>> > > > Thanks,
>> > > > Vishnu
>> > > >
>> > > > On Fri, Jul 29, 2016 at 6:22 AM, Aljoscha Krettek <
>> aljos...@apache.org
>> > >
>> > > > wrote:
>> > > >
>> > > > > About processing time and timestamps:
>> > > > >
>> > > > > The timestamp is either set in the source of in an
>> > > > > in-between TimestampAssigner that can be used with
>> > > > > DataStream.assignTimestampsAndWatermarks(). However, the
>> timestamp in
>> > > the
>> > > > > element is normally not a "processing-time timestamp". I think it
>> > might
>> > > > > make sense to split the functionality for the evictors into two
>> > parts:
>> > > > one
>> > > > > that implicitly sets a timestamp and one that uses these
>> timestamps.
>> > It
>> > > > > could look like this:
>> > > > >
>> > > > > DataStream input = ...
>> > > > > // this makes the current processing time explicit in the tuples:
>> > > > > DataStream<Tuple2<Long, T>> withTimestamps = input.map(new
>> > > > > ReifyProcessingTIme());
>> > > > > withTimestamps
>> > > > >   .keyBy(...)
>> > > > >   .window(..)
>> > > > >   .evictor(new ProcessingTimeEvictor())
>> > > > >   .apply(...)
>> > > > >
>> > > > > where ProcessingTimeEvictor looks like this:
>> > > > >
>> > > > > class ProcessingTimeEvictor extends Evictor<Tuple2<Long, T>> {
>> > > > >   void evictBefore(Iterable<Tuple2<Long, T>>, ...);
>> > > > >   void evictAfter ...
>> > > > > }
>> > > > >
>> > > > > This would make everything that is happening explicit in the type
>> > > > > signatures and explicit for the user.
>> > > > >
>> > > > > Cheers,
>> > > > > Aljoscha
>> > > > >
>> > > > > On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek <
>> aljos...@apache.org>
>> > > > wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > > in fact, changing it to Iterable would simplify things
>> because
>> > > then
>> > > > > we
>> > > > > > would not have to duplicate code for the EvictingWindowOperator
>> any
>> > > > more.
>> > > > > > It could be a very thin subclass of WindowOperator.
>> > > > > >
>> > > > > > Cheers,
>> > > > > > Aljoscha
>> > > > > >
>> > > > > > On Wed, 27 Jul 2016 at 03:56 Vishnu Viswa

RE: [DISCUSS] Timely User Functions and Watermarks

2016-09-27 Thread Radu Tudoran
Hi Aljoscha,

My 2 cents on this would be that it is worth maintaining the access to the 
watermarks. I think having the option to customize this is a strong point of 
Flink.

Regarding the solution you proposed based on 2 input timers " would fire if the 
watermark from both inputs advances sufficiently far." I would propose to have 
the option to set a strategy for the timer. We could have:
- EgerTrigger - when the triggering is fired when any of the inputs watermarks 
has advanced sufficiently far
- LeftEgerTrigger - when the triggering is fired when any of left input 
watermarks has advanced sufficiently far
- RightEgerTrigger - when the triggering is fired when any of right input 
watermarks has advanced sufficiently far
- SyncTrigger - when the triggering is fired if the watermark from both inputs 
advances sufficiently far


We could potentially include here the custom handling of the watermarks under a 
CustomTrigger strategy implemented as an operator that can be provided.


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, September 27, 2016 11:28 AM
To: Dev
Subject: [DISCUSS] Timely User Functions and Watermarks

Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a 
problem with deciding how watermarks should be treated for operators that have 
more than one input.

The problem is deciding when to fire event-time timers. For one-input operators 
it's pretty straightforward: fire once the watermark surpasses a given timer. 
For two-input operators we allow the operator implementer to observe the 
watermarks from the two inputs separately and react to that and also to decide 
what watermark to forward. With this it becomes hard to figure out when to fire 
timers.

My thinking is that we should not allow operators to observe the watermark 
anymore but route it past the operator and deal with watermarks and timers 
outside of the operator. A timer for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances 
sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the 
watermark before it enters the operator and still deal with timers in the same 
way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for 
timers of operators with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators is 
that otherwise every operator has to implement the functionality to update the 
current watermark at the timer service. i.e. something like this:

@Internal
public class StreamMap
extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {

private static final long serialVersionUID = 1L;

public StreamMap(MapFunction mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public void processElement(StreamRecord element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue(;
}

@Override
public void processWatermark(Watermark mark) throws Exception {
timerService.updateWatermark(mark); // *<--- that's the thing I don't 
want*
output.emitWatermark(mark);
}
}

This becomes more complicated for two input operators which also do the merging 
of the watermarks from the two inputs right now.


RE: [DISCUSS] Allowed Lateness in Flink

2016-07-28 Thread Radu Tudoran
Hi,

IMHO I think we should still maintain user specific triggers and I think there 
will always be corner cases where a very specific trigger will be needed to be 
constructed. With this being said, I think the idea of supporting also some 
state machine to be generated for the trigger is very good. Will you start a 
FLIP document for this?



-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Thursday, July 28, 2016 3:47 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Allowed Lateness in Flink

Another (maybe completely crazy) idea is to regard the triggers really as a DSL 
and use compiler techniques to derive a state machine that you use to do the 
actual triggering.

With this, the "trigger" objects that make up the tree of triggers would not 
contain any logic themselves. A trigger specification such as 
And(AfterWatermark, Count(5)) would simply be an AST of our "trigger language" 
and from this we derive that our trigger waits on the watermark and also until 
the element count is at least 5. We would generate a compact state machine for 
this that is updated with incoming elements and sometimes (that's the somewhat 
tricky) party checked for whether we should fire.

The advantages of this are:
 - Possibly very tight state representation that is known at job specification 
time.
 - No dealing with user specified triggers since our DSL is strictly specified 
by us

The disadvantages are:
 - No user specified triggers inside the DSL

The last part would be mitigated by still allowing users to write triggers for 
the current Trigger API if they want/need all the power that that provides.

Just some thoughts...

Cheers,
Aljoscha

On Tue, 26 Jul 2016 at 14:31 Kostas Kloudas 
wrote:

> And also I think that the shouldFire has to take as an additional 
> argument the time. This will help differentiate between ON_TIME and 
> EARLY, LATE firings.
>
> > On Jul 26, 2016, at 11:02 AM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
> >
> > Hello,
> >
> > This is a nice proposal that I think covers most of the cases.
> > The only thing that is missing would be a method:
> >
> > void onFire(Window window, TriggerContext ctx)
> >
> > that will be responsible for doing whatever is necessary if the 
> > windowOperator decides to fire. You can imagine resetting the 
> > counter of a countTrigger to 0.
> >
> > As a recap, the SimpleTrigger interface should be:
> >
> > class SimpleTrigger {
> > void onElement(T element, long timestamp, W window, TriggerContext 
> > ctx); boolean shouldFire(W window, TriggerContext cox);
> >
> > void onMerge(W window, OnMergeContext cox); void onFire(Window 
> > window, TriggerContext ctx) void clear(W window, TriggerContext 
> > ctx); }
> >
> > The onMerge and onFire methods can be seen as callbacks and will be 
> > applied upon merge (in case of Session windows) and upon firing.
> >
> > What do you think?
> >
> > Kostas
> >
> >> On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek 
> wrote:
> >>
> >> Hi,
> >> yes, this is essentially the solution I had in my head but I went a 
> >> bit further and generalized it.
> >>
> >> Basically, to make triggers composable they should have this 
> >> interface, let's call it SimpleTrigger for now:
> >>
> >> class SimpleTrigger {
> >> void onElement(T element, long timestamp, W window, TriggerContext 
> >> ctx); boolean shouldFire(W window, TriggerContext ctx); void 
> >> onMerge(W window, OnMergeContext ctx); void clear(W window, 
> >> TriggerContext ctx); }
> >>
> >> notice how onElement() cannot return a TriggerResult anymore and 
> >> how
> >> onEventTime() and onProcessingTime() of the currently existing 
> >> Trigger interface were folded into shouldFire(). Each trigger 
> >> essentially
> becomes a
> >> predicate that says at any given time whether they would fire the
> window.
> >> Having just one method that can decide whether to fire or not makes
> these
> >> easily composable to form complex triggers, thus enabling the 
> >> trigger
> DSL
> >> we want to implement.
> >>
> >> The way to go about implementing this is either to replace our 
> >> current Trigger interface by this new interface or to keep our more 
> >> powerful interface with all the customization options and have one 
> >> SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A 
> >> rough sketch of this would be this:
> >> https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >>
> >>
> >> On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <
> k.klou...@data-artisans.com>
> >> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> This was exactly one of the problems I also found.
> >>>
> >>> The way I was thinking about it is the following:
> >>>
> >>> Conceptually, time (event and processing) advances but state is a 
> >>> fixed property of the window.
> >>>
> >>> Given this, I modified the Count trigger to also ask for the 
> >>> current 

RE: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-07-25 Thread Radu Tudoran
Hi Aljoscha,

Can you point us to the way it is handled now. Is there anything else for the 
removing of elements other than the skip in EvictingWindowOperator. Is there 
something as it was before version 1.x where you had an explicit remove from 
window buffers?

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Monday, July 25, 2016 11:45 AM
To: dev@flink.apache.org
Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

Hi,
I think there is not yet a clear specification for how the actual removal
of elements from the buffer will work. I think naively one can do:

Iterable currentElements = state.get()
evictor.evict(currentElements); // this will remove some stuff from there,
or mark for removal

state.clear()
// the Iterable does not loop over the removed/marked elements
for (E element : currentElements) {
  state.add(element)
}

This is very costly but the only way I see of doing this right now with
every state backend.

Cheers,
Aljoscha

On Mon, 25 Jul 2016 at 09:46 Radu Tudoran <radu.tudo...@huawei.com> wrote:

> Hi,
>
> Thanks for the clarification. Can someone point to where the events are
> removed from buffers - I am trying to understand the new logic of handling
> the eviction in this new API. Thanks
>
>
>
> -Original Message-
> From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com]
> Sent: Saturday, July 23, 2016 3:04 AM
> To: Dev
> Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
>
> Hi Radu,
>
> - Yes we can remove elements from the iterator.
> - Right now the EvictingWindowOperator just skips the elements from the
> Iterable before passing to the window function(Yes this has to be changed
> in the new API)
> - Regarding how the last question on how elements are being removed from
> the window buffer. I am not sure how it is working right now, but when
> trying out the new API that I am working on, I did find a bug where the
> evicted elements are not actually removed from the State. I have added a
> fix for that.  (You can see a mail regarding that in this mail chain)
>
> Thanks,
> Vishnu
>
> On Fri, Jul 22, 2016 at 1:03 PM, Radu Tudoran <radu.tudo...@huawei.com>
> wrote:
>
> > Hi,
> >
> > Overall I believe that the interfaces and the proposal is good. I have
> the
> > following question though: can you delete via the iterator
> > (Iterable<StreamRecord> elements) the elements?
> >
> > I tried to look over the code where the eviction happens (I did not do
> > these since version 0.10...looks very different now :) )...the only
> > reference I found was the EvictingWindowOperator which at the
> > fireOrContinue has a "skip" based on the number of elements returned from
> > the evictor...and these are not put in the collection to be given to the
> > user function to be applied. I think these will also need to be changed
> to
> > adjust to the "any operator from anywhere in the window buffer".
> > Also - as we are on this topic - can someone explain how these elements
> > that are not consider anymore for the user function are actually deleted
> > from the window buffer?..i did not manage to find this.. some reference
> to
> > classes/code where this happens would be useful
> >
> >
> > Dr. Radu Tudoran
> > Research Engineer - Big Data Expert
> > IT R Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205,

RE: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-07-25 Thread Radu Tudoran
Hi,

Thanks for the clarification. Can someone point to where the events are removed 
from buffers - I am trying to understand the new logic of handling the eviction 
in this new API. Thanks



-Original Message-
From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] 
Sent: Saturday, July 23, 2016 3:04 AM
To: Dev
Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

Hi Radu,

- Yes we can remove elements from the iterator.
- Right now the EvictingWindowOperator just skips the elements from the
Iterable before passing to the window function(Yes this has to be changed
in the new API)
- Regarding how the last question on how elements are being removed from
the window buffer. I am not sure how it is working right now, but when
trying out the new API that I am working on, I did find a bug where the
evicted elements are not actually removed from the State. I have added a
fix for that.  (You can see a mail regarding that in this mail chain)

Thanks,
Vishnu

On Fri, Jul 22, 2016 at 1:03 PM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi,
>
> Overall I believe that the interfaces and the proposal is good. I have the
> following question though: can you delete via the iterator
> (Iterable<StreamRecord> elements) the elements?
>
> I tried to look over the code where the eviction happens (I did not do
> these since version 0.10...looks very different now :) )...the only
> reference I found was the EvictingWindowOperator which at the
> fireOrContinue has a "skip" based on the number of elements returned from
> the evictor...and these are not put in the collection to be given to the
> user function to be applied. I think these will also need to be changed to
> adjust to the "any operator from anywhere in the window buffer".
> Also - as we are on this topic - can someone explain how these elements
> that are not consider anymore for the user function are actually deleted
> from the window buffer?..i did not manage to find this.. some reference to
> classes/code where this happens would be useful
>
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -Original Message-
> From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com]
> Sent: Friday, July 22, 2016 12:43 PM
> To: Dev
> Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
>
> Hi,
>
> I have created a FLIP page for this enhancement
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
>
> Thanks,
> Vishnu
>
> On Thu, Jul 21, 2016 at 6:53 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
> > Thanks Aljoscha.
> >
> > On Thu, Jul 21, 2016 at 4:46 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >> this, in fact, seems to be a bug. There should be something like
> >> windowState.clear();
> >> for (IN element: projectedContents) {
> >>windowState.add(element);
> >> }
> >>
> >> after passing the elements to the window function.
> >>
> >> This is very inefficient but the only way I see of doing it right now.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >>
> >> On Thu, 21 Jul 2016 at 01:32 Vishnu Viswanath <
> >> vishnu.viswanat...@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > When we use RocksDB as state backend, how does the backend state get
> >> > updated after some elements are evicted from the window?
> >> > I don't see any update call being made to remove the element from the
> >> state
> 

RE: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

2016-07-22 Thread Radu Tudoran
Hi,

Overall I believe that the interfaces and the proposal is good. I have the 
following question though: can you delete via the iterator 
(Iterable<StreamRecord> elements) the elements?

I tried to look over the code where the eviction happens (I did not do these 
since version 0.10...looks very different now :) )...the only reference I found 
was the EvictingWindowOperator which at the fireOrContinue has a "skip" based 
on the number of elements returned from the evictor...and these are not put in 
the collection to be given to the user function to be applied. I think these 
will also need to be changed to adjust to the "any operator from anywhere in 
the window buffer".
Also - as we are on this topic - can someone explain how these elements that 
are not consider anymore for the user function are actually deleted from the 
window buffer?..i did not manage to find this.. some reference to classes/code 
where this happens would be useful


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] 
Sent: Friday, July 22, 2016 12:43 PM
To: Dev
Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink

Hi,

I have created a FLIP page for this enhancement
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor

Thanks,
Vishnu

On Thu, Jul 21, 2016 at 6:53 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Thanks Aljoscha.
>
> On Thu, Jul 21, 2016 at 4:46 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> this, in fact, seems to be a bug. There should be something like
>> windowState.clear();
>> for (IN element: projectedContents) {
>>windowState.add(element);
>> }
>>
>> after passing the elements to the window function.
>>
>> This is very inefficient but the only way I see of doing it right now.
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Thu, 21 Jul 2016 at 01:32 Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > When we use RocksDB as state backend, how does the backend state get
>> > updated after some elements are evicted from the window?
>> > I don't see any update call being made to remove the element from the
>> state
>> > stored in RocksDB.
>> >
>> > It looks like the RocksDBListState is only having get() and add()
>> methods
>> > since it is an AppendingState, but that causes the evicted elements to
>> come
>> > back when the trigger is fired next time. (It works fine when I use
>> > MemoryStateBackend)
>> >
>> > Is this expected behavior or am I missing something.
>> >
>> > Thanks,
>> > Vishnu
>> >
>> > On Mon, Jul 18, 2016 at 7:15 AM, Vishnu Viswanath <
>> > vishnu.viswanat...@gmail.com> wrote:
>> >
>> > > Hi Aljoscha,
>> > >
>> > > Thanks! Yes, I have the create page option now in wiki.
>> > >
>> > > Regards,
>> > > Vishnu Viswanath,
>> > >
>> > > On Mon, Jul 18, 2016 at 6:34 AM, Aljoscha Krettek <
>> aljos...@apache.org>
>> > > wrote:
>> > >
>> > >> @Radu, addition of more window types and sorting should be part of
>> > another
>> > >> design proposal. This is interesting stuff but I think we should keep
>> > >> issues separated because things can get complicated very quickly.
>> > >>
>> > >> On Mon, 18 Jul 2016 at 12:32 Aljoscha Krettek <aljos...@apache.org>
>> > >> wrote:
>> > >>
>> > &

Re: Flink Table & SQL doesn't work in very simple example

2016-07-20 Thread Radu Tudoran
Hi,

I am also using v1.1...with eclipse.

i will re-download the source and build it again.
Is there also a binary version for version 1.1 (i would like to test also 
againat that) particularly if the issue persists.

otherwise i am downloading and building the version from the main git branch...


From:Timo Walther
To:dev@flink.apache.org,
Date:2016-07-20 13:55:32
Subject:Re: Flink Table & SQL doesn't work in very simple example

I also tried it again with the latest 1.1-SNAPSHOT and everything works.
This Maven issue has been solved in FLINK-4111.



Am 20/07/16 um 13:43 schrieb Suneel Marthi:
> I am not seeing an issue with this code Radu, this is from present
> 1.1-Snapshot.
>
> This is what I have and it works (running from within IntelliJ and not cli)
> :
>
>
> List<Tuple3<Long, String, Integer>> input = new ArrayList<>();
> input.add(new Tuple3<>(3L,"test",1));
> input.add(new Tuple3<>(5L,"test2",2));
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1);
> DataStream<Tuple3<Long, String, Integer>> ds = env.fromCollection(input);
>
> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
>
> tableEnv.registerDataStream("Words", ds, "frequency, word, pos");
> // run a SQL query on the Table and retrieve the result as a new Table
> Table result = tableEnv.sql("SELECT STREAM word, pos FROM Words WHERE
> frequency > 2");
>
>
>
>
> On Wed, Jul 20, 2016 at 6:55 AM, Radu Tudoran <radu.tudo...@huawei.com>
> wrote:
>
>> Hi,
>>
>> As far as I managed to isolate the cause of the error so far it has to do
>> with some mismatch in the function call
>>
>> val traitDefs:ImmutableList[RelTraitDef[_ <: RelTrait]] =
>> config.getTraitDefs
>>
>> I am not sure thought why it is not working because when I tried to make a
>> dummy test by creating a program and  calling that function, everything
>> works.
>> Can it be that there is some overlapping between libraries that contain
>> the ImmutableList type?
>> google/common/collect/ImmutableList (with flink shaded)?
>> As per the error
>> "/apache/flink/shaded/calcite/com/google/common/collect/ImmutableList;"
>>
>>
>> -Original Message-
>> From: Maximilian Michels [mailto:m...@apache.org]
>> Sent: Wednesday, July 20, 2016 11:52 AM
>> To: dev@flink.apache.org
>> Cc: Timo Walther
>> Subject: Re: Flink Table & SQL doesn't work in very simple example
>>
>> CC Timo who I know is working on Table API and SQL.
>>
>>
>>
>> On Tue, Jul 19, 2016 at 6:14 PM, Radu Tudoran <radu.tudo...@huawei.com>
>> wrote:
>>> Hi,
>>>
>>> I am not sure that this problem was solved. I am using the last pom to
>> compile the table API.
>>> I was trying to run a simple program.
>>>
>>>
>>> ArrayList<Tuple3<Long, String, Integer>> input = new
>> ArrayList<Tuple3<Long, String, Integer>>();
>>>  input.add(new Tuple3<Long, String,
>> Integer>(3L,"test",1));
>>>  input.add(new Tuple3<Long, String,
>>> Integer>(5L,"test2",2));
>>>
>>>  DataStream<Tuple3<Long, String, Integer>> ds =
>>> env.fromCollection(input);
>>>
>>> StreamTableEnvironment tableEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>
>>>  tableEnv.registerDataStream("Words", ds, "frequency,
>> word, position");
>>>  // run a SQL query on the Table and retrieve the result
>> as a new Table
>>>      Table result = tableEnv.sql(
>>>"SELECT STREAM product, amount FROM Words WHERE
>>> frequency > 2");
>>>
>>>
>>>
>>> ..and I get:
>>>
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>> org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/shaded/calcite/com/google/common/collect/ImmutableList;
>>>  at
>> org.apache.flink.api.table.FlinkPlannerImpl.(FlinkPlannerImpl.scala:50)
>>>  at
>> org.apache.flink.api.table.StreamTableEnvironment.sql(StreamTableEnvironment.scala:127)
>>>  at TestStreamSQL.main(TestStreamSQL.java:69)
>>>
>>>
>>> Any thoughts on how this can be solved?
>>>
>>>
>>> Dr. Radu Tudoran
>>> Research Engineer - Big Data

RE: Flink Table & SQL doesn't work in very simple example

2016-07-19 Thread Radu Tudoran
Hi,

I am not sure that this problem was solved. I am using the last pom to compile 
the table API.

I was trying to run a simple program.


ArrayList<Tuple3<Long, String, Integer>> input = new ArrayList<Tuple3<Long, 
String, Integer>>();
input.add(new Tuple3<Long, String, Integer>(3L,"test",1));
input.add(new Tuple3<Long, String, Integer>(5L,"test2",2));

DataStream<Tuple3<Long, String, Integer>> ds = 
env.fromCollection(input);

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

tableEnv.registerDataStream("Words", ds, "frequency, word, 
position");
// run a SQL query on the Table and retrieve the result as a 
new Table
Table result = tableEnv.sql(
  "SELECT STREAM product, amount FROM Words WHERE frequency > 
2");



..and I get:

Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/shaded/calcite/com/google/common/collect/ImmutableList;
at 
org.apache.flink.api.table.FlinkPlannerImpl.(FlinkPlannerImpl.scala:50)
at 
org.apache.flink.api.table.StreamTableEnvironment.sql(StreamTableEnvironment.scala:127)
    at TestStreamSQL.main(TestStreamSQL.java:69)


Any thoughts on how this can be solved?


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Thursday, June 23, 2016 11:13 AM
To: dev@flink.apache.org
Subject: Re: Flink Table & SQL doesn't work in very simple example

Hi Jark Wu,

yes, that looks like a dependency issue.
Can you open a JIRA for it set "Fix Version" to 1.1.0. This issue should be 
resolved for the 1.1 release.

Thanks, Fabian

2016-06-22 3:52 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>:

> Hi,
>
>
> I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table 
> API and SQL in my project. But when I run the very simple example 
> WordCountTable, I encountered the following exception :
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList;
>   at
> org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47)
>   at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269)
>   at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253)
>   at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542)
>   at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817)
>   at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)
>   at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)
>   at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723)
>   at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331)
>   at
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250)
>   at
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
>   at
> org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
>   at
> com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43)
>   at 
> com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala)
>
>
> It seems that something  wrong with our guava shade. Do you have any ideas?
>
> My pom fi

RE: [DISCUSS] FLIP-2 Extending Window Function Metadata

2016-07-18 Thread Radu Tudoran
Hi,

Sorry - I made a mistake - I was thinking of getting access to the collection 
(mist-read :) collector) of events in the window buffer  in order to be able to 
delete/evict some of them which are not necessary the last ones.


Radu

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Monday, July 18, 2016 5:54 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] FLIP-2 Extending Window Function Metadata

What about the collector? This is only used for emitting elements to the 
downstream operation.

On Mon, 18 Jul 2016 at 17:52 Radu Tudoran <radu.tudo...@huawei.com> wrote:

> Hi,
>
> I think it looks good and most importantly is that we can extend it in 
> the directions discussed so far.
>
> One question though regarding the Collector - are we going to be able 
> to delete random elements from the list if this is not exposed as a 
> collection, at least to the evictor? If not, how are we going to 
> extend in the future to cover this case?
>
> Regarding the ordering - I also observed that there are situations 
> where elements do not have a logical order. One example is if you have 
> high rates of the events. Nevertheless, even if now is not the time 
> for this, I think in the future we can imagine having also some data 
> structures that offer some ordering. It can save some computation 
> efforts later in the functions for some use cases.
>
>
> Best regards,
>
>
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Monday, July 18, 2016 3:45 PM
> To: dev@flink.apache.org
> Subject: Re: [DISCUSS] FLIP-2 Extending Window Function Metadata
>
> I incorporated the changes. The proposed interface of 
> ProcessWindowFunction is now this:
>
> public abstract class ProcessWindowFunction <IN, OUT, KEY, W extends
> Window> implements Function {
>
> public abstract void process(KEY key, Iterable elements, 
> Context
> ctx) throws Exception;
>
> public abstract class Context {
> public abstract W window();
> public abstract void output(OUT value);
> }
> }
>
> I'm proposing to not expose Collector anymore because it has the 
> close() method that should not be called by users. Having the output() 
> call directly on the context should work just as well.
>
> Also, I marked the "adding a firing reason" and "adding firing 
> counter" as future work that are only examples of stuff that can be 
> implemented on top of the new interface. Initially, this will provide 
> exactly the same features as the old API but be extensible. I did this 
> to not make the scope of this proposal to big because Radu also 
> suggested more changes and each of them should be covered in a separate 
> design doc or FLIP.
>
> @Radu: On the different buffer types. I think this would be very tricky.
> Right now, people should also not rely on the fact that elements are 
> "FIFO". Some state backends might keep the elements in a different 
> order and when you have merging windows/session windows the order of 
> the elements will also not be preserved.
>
> Cheers,
> Aljoscha
>
> On Wed, 13 Jul 2016 at 18:40 Radu Tudoran <radu.tudo...@huawei.com> wrote:
>
> > Hi,
> >
> > If it is to extend the Context to pass more information between the 
> > stages of processing a window (triggering -> process -> eviction), 
> > why not adding also a "EvictionInfo"? I think this might actually 
> > help with the issues discussed in the tread related to the eviction policy.
> > I could imagine using this parameter to pass the conditions, from 
> > the processing stage to the evictor, about what events to be eliminated.
> >
> >
> >
> >
> > public abstract class Context {
> >
> >public abstract EvictionInfo evictionInfo();
> >
> > ...
> >
> >
> >public abstract KEY key();
> >
> >public abstract W window();
> >
> >public abstract int id();
> >
> >public abstract FiringInfo firingInfo();
> >
> >public abstract Iterable elements();
> >
> >public abstract void output(OUT value);
> >
> > }
> >
> >
> > Also on a slightly unrelated issue - how hard it would be to 
> > introduce different types of buffers for the windows. Currently the 
> > existing one is behaving (when under processing) similar with a FIFO 
> > queue (in the sense that you need to start from beginning, from the oldest 
> > element).
> > How about enabling for example also LIFO behavior (start iterating 
> > through the list from the most recent element). As in the sou

RE: [DISCUSS] FLIP-2 Extending Window Function Metadata

2016-07-18 Thread Radu Tudoran
Hi,

I think it looks good and most importantly is that we can extend it in the 
directions discussed so far.

One question though regarding the Collector - are we going to be able to delete 
random elements from the list if this is not exposed as a collection, at least 
to the evictor? If not, how are we going to extend in the future to cover this 
case?

Regarding the ordering - I also observed that there are situations where 
elements do not have a logical order. One example is if you have high rates of 
the events. Nevertheless, even if now is not the time for this, I think in the 
future we can imagine having also some data structures that offer some 
ordering. It can save some computation efforts later in the functions for some 
use cases. 


Best regards,


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Monday, July 18, 2016 3:45 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] FLIP-2 Extending Window Function Metadata

I incorporated the changes. The proposed interface of ProcessWindowFunction is 
now this:

public abstract class ProcessWindowFunction <IN, OUT, KEY, W extends
Window> implements Function {

public abstract void process(KEY key, Iterable elements, Context
ctx) throws Exception;

public abstract class Context {
public abstract W window();
public abstract void output(OUT value);
}
}

I'm proposing to not expose Collector anymore because it has the close() method 
that should not be called by users. Having the output() call directly on the 
context should work just as well.

Also, I marked the "adding a firing reason" and "adding firing counter" as 
future work that are only examples of stuff that can be implemented on top of 
the new interface. Initially, this will provide exactly the same features as 
the old API but be extensible. I did this to not make the scope of this 
proposal to big because Radu also suggested more changes and each of them 
should be covered in a separate design doc or FLIP.

@Radu: On the different buffer types. I think this would be very tricky.
Right now, people should also not rely on the fact that elements are "FIFO". 
Some state backends might keep the elements in a different order and when you 
have merging windows/session windows the order of the elements will also not be 
preserved.

Cheers,
Aljoscha

On Wed, 13 Jul 2016 at 18:40 Radu Tudoran <radu.tudo...@huawei.com> wrote:

> Hi,
>
> If it is to extend the Context to pass more information between the 
> stages of processing a window (triggering -> process -> eviction), why 
> not adding also a "EvictionInfo"? I think this might actually help 
> with the issues discussed in the tread related to the eviction policy. 
> I could imagine using this parameter to pass the conditions, from the 
> processing stage to the evictor, about what events to be eliminated.
>
>
>
>
> public abstract class Context {
>
>public abstract EvictionInfo evictionInfo();
>
> ...
>
>
>public abstract KEY key();
>
>public abstract W window();
>
>public abstract int id();
>
>public abstract FiringInfo firingInfo();
>
>public abstract Iterable elements();
>
>public abstract void output(OUT value);
>
> }
>
>
> Also on a slightly unrelated issue - how hard it would be to introduce 
> different types of buffers for the windows. Currently the existing one 
> is behaving (when under processing) similar with a FIFO queue (in the 
> sense that you need to start from beginning, from the oldest element). 
> How about enabling for example also LIFO behavior (start iterating 
> through the list from the most recent element). As in the source 
> queues or stacks are not actually used, perhaps we can just pass 
> policies to the iterator - or have custom itrators
>
>
>
>
>
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered 
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing 
> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: 
> Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and 
> its attachments contain confidential information from HUAWEI, which is 
> intended only for the person or entity whose address is listed above. 
> Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproductio

RE: [DISCUSS] FLIP-2 Extending Window Function Metadata

2016-07-13 Thread Radu Tudoran
Hi,

If it is to extend the Context to pass more information between the stages of 
processing a window (triggering -> process -> eviction), why not adding also a 
"EvictionInfo"? I think this might actually help with the issues discussed in 
the tread related to the eviction policy. I could imagine using this parameter 
to pass the conditions, from the processing stage to the evictor, about what 
events to be eliminated. 




public abstract class Context {

   public abstract EvictionInfo evictionInfo();

...


   public abstract KEY key();

   public abstract W window();

   public abstract int id();

   public abstract FiringInfo firingInfo();

   public abstract Iterable elements();

   public abstract void output(OUT value);

}


Also on a slightly unrelated issue - how hard it would be to introduce 
different types of buffers for the windows. Currently the existing one is 
behaving (when under processing) similar with a FIFO queue (in the sense that 
you need to start from beginning, from the oldest element). How about enabling 
for example also LIFO behavior (start iterating through the list from the most 
recent element). As in the source queues or stacks are not actually used, 
perhaps we can just pass policies to the iterator - or have custom itrators






Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Wednesday, July 13, 2016 2:24 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] FLIP-2 Extending Window Function Metadata

Sure, I also thought about this but went for the "extreme" initially. If no-one 
objects I'll update the doc in a bit.

On Wed, 13 Jul 2016 at 14:17 Stephan Ewen <se...@apache.org> wrote:

> Thanks for opening this.
>
> I see the need for having an extensible context object for window 
> function invocations, but i think hiding every parameter in the 
> context is a bit unnatural.
>
> How about having a function "apply(Key, Values, WindowContext, Collector)"
> ?
> It should be possible to write the straightforward use cases without 
> accessing the context object.
>
>
>
> On Wed, Jul 13, 2016 at 1:56 PM, Aljoscha Krettek 
> <aljos...@apache.org>
> wrote:
>
> > Hi,
> > this is a proposal to introduce a new interface for the window 
> > function
> to
> > make it more extensible for the future where we might want to 
> > provide additional information about why a window fired to the user 
> > function:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Win
> dow+Function+Metadata
> >
> > I'd appreciate your thoughts!
> >
> > Cheers,
> > Aljoscha
> >
>


RE: [DISCUSS] Enhance Window Evictor in Flink

2016-07-07 Thread Radu Tudoran
Hi,

@Aljoscha - I can understand the reason why you are hesitant to introduce 
"slower" windows such as the ones that would maintain sorted items or windows 
with bindings between the different entities (evictor, trigger, window, apply 
function). However, I think it's possible just to create more types of windows. 
The existing ones (timewindows, global windows ...) can remain, and just add 
some more flavors of windows were more features are enabled or more 
functionality (e.g., access to the each element in the evictor ; possibility to 
delete or mark for eviction elements in the function...) 

Regarding the specific case of sorted windows, I think the N lon N complexity 
to sort (the worst case) is very unlikely. In fact you have almost sorted 
items/arrays. Moreover, if you consider that in iteration X all elements were 
sorted, then in iteration X+1 you will need to sort just the newly arrived 
elements (M). I would expect that this number M might be significant smaller 
then N (elements that exists). Then using an insertion sort for these new 
elements you would have M  * N complexity and if M<< N then the complexity is 
O(N). Alternatively you can use a binary search for insertion and then you 
further reduce the complexity to O(logN).
If M is proportional to N then you can sort M and use merge sort for combining.


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-Original Message-
From: 吕文龙(吕文龙) [mailto:wenlong@alibaba-inc.com] 
Sent: Thursday, July 07, 2016 11:59 AM
To: dev@flink.apache.org
Subject: 答复: [DISCUSS] Enhance Window Evictor in Flink

HI,
I think it is necessary to support sorted window, which can avoid scanning all 
the elements of window while trying to evicting element, which may cost many IO 
operations, such as querying DBs to get elements from state. 
What's more, when an window aggregation function is invertible, such as sum, 
which can be updated by adding or removing a single record, window results can 
be incrementally calculated. In this kind of case, we can dramatically improve 
the performance of window aggregation, if evictor can trigger update of window 
aggregation state by some mechanism.

Best Wishes!
---
wenlong.lwl

-邮件原件-
发件人: Aljoscha Krettek [mailto:aljos...@apache.org]
发送时间: 2016年7月7日 17:32
收件人: dev@flink.apache.org
主题: Re: [DISCUSS] Enhance Window Evictor in Flink

Hi,
regarding "sorting the window by event time": I also considered this but in the 
end I don't think it's necessary. Sorting is rather expensive and making 
decisions based on the order of elements can be tricky. An extreme example of 
why this can be problematic is the case where all elements in the window have 
the same timestamp. Now, if you decide to evict the first 5 elements based on 
timestamp order you basically arbitrarily evict 5 elements. I think the better 
solution for doing time-based eviction is to do one pass over the elements to 
get an overview of the timestamp distribution, then do a second pass and evict 
elements based on what was learned in the first pass. This has complexity 2*n 
compared to the n*log n (plus the work of actually deciding what to evict) of 
the sort based strategy.

I might be wrong, though, and there could be a valid use-case not covered by 
the above idea.

regarding Vishnu's other use case of evicting based on some decision in the
WindowFunction: could this be solved by doing the check for the pattern in the 
evictor itself instead of in the window function? I'm very hesitant to 
introduce a coupling between the different components of the windowing system, 
i.e. assigner, trigger, evictor and window function. The reason is that using 
an evictor has a huge performance impact since the system always has to keep 
all elements and cannot to incremental aggregation of window results and I 
therefore don't want to put specific features regarding eviction into the other 
components.

Cheers,
Aljoscha

On Thu

RE: [DISCUSS] Enhance Window Evictor in Flink

2016-07-07 Thread Radu Tudoran
Hi,

I think the situation Vishnu raised is something that should be accounted. It 
can happen indeed that you want to condition what you evict from  the window 
based on the result of the function to be applied.

My 2 cents...
I would suggest adding a list for the elements of the stream where you can MARK 
them to be delete. Alternatively the iterator can be extended to have a 
function Iterator.markForEviction(int); These can be made available also in the 
apply function. Moreover, we can use this to extend the functionality such that 
you add MARKs either for eviction after the function has finished triggering or 
to be evicted in the next iteration.


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] 
Sent: Thursday, July 07, 2016 1:28 AM
To: Dev
Subject: Re: [DISCUSS] Enhance Window Evictor in Flink

Thank you Maxim and Aljoscha.

Yes the beforeEvict and afterEvict should able address point 3.

I have one more use case in my mind (which I might have to do in the later 
stages of POC).
What if the `evictAfter` should behave differently based on the window function.

For example.
I have a window that got triggered and my evict function is being called after 
the apply function. In such cases I should be able to decide on what I should 
evict based on the window function.
e.g.,
let the window have elements of type `case class Item(id: String, type:
String)`  and let the types be `type1` and `type2`.
If window function is able to find a sequence : `type1 type2 type1`, then evict 
all elements of the type type2.
or if the window function is able to find a sequence `type2 type2 type1`, then 
evict all elements of type type1 else don't evict any elements.

Is this possible? or at least let the window function choose between two 
Evictor functions -(one for success case and one failure case)

@Maxim:
regarding the sorted window, actually I wanted my elements to be sorted but not 
for the eviction but while applying the window function (so thought this could 
be done easily). But it would be good to have the window sorted based on 
EventTime.


Thanks and Regards,
Vishnu Viswanath,




On Wed, Jul 6, 2016 at 3:55 PM, Maxim <mfat...@gmail.com> wrote:

> Actually for such evictor to be useful the window should be sorted by 
> some field, usually event time. What do you think about adding sorted 
> window abstraction?
>
> On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek 
> <aljos...@apache.org>
> wrote:
>
> > @Maxim: That's perfect I didn't think about using Iterator.remove() 
> > for that. I'll update the doc. What do you think Vishnu? This should 
> > also
> cover
> > your before/after case nicely.
> >
> > @Vishnu: The steps would be these:
> >  - Converge on a design in this discussion
> >  - Add a Jira issue here: 
> > https://issues.apache.org/jira/browse/FLINK
> >  - Work on the code an create a pull request on github
> >
> > The steps are also outlined here
> > http://flink.apache.org/how-to-contribute.html and here 
> > http://flink.apache.org/contribute-code.html.
> >
> > -
> > Aljoscha
> >
> > On Wed, 6 Jul 2016 at 19:45 Maxim <mfat...@gmail.com> wrote:
> >
> > > The new API forces iteration through every element of the buffer 
> > > even
> if
> > a
> > > single value to be evicted. What about implementing 
> > > Iterator.remove() method for elements? The API would look like:
> > >
> > > public interface Evictor<T, W extends Window> extends Serializable 
> > > {
> > >
> > >/**
> > > *  Optionally evicts elements. Called before windowing function.
> > > *
> > > * @param elements The elements currently in the pane. Us

RE: [ANNOUNCE] Flink 1.0.0 has been released

2016-03-08 Thread Radu Tudoran
Hi,

Do you have also a linkedin post that I could share - or should I make a 
blogpost in which I take this announcement? 



Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Kostas Tzoumas [mailto:ktzou...@apache.org] 
Sent: Tuesday, March 08, 2016 4:17 PM
To: u...@flink.apache.org; dev@flink.apache.org; n...@flink.apache.org
Subject: [ANNOUNCE] Flink 1.0.0 has been released

Hi everyone!

As you might have noticed, Apache Flink 1.0.0 has been released and announced!

You can read more about the release at the ASF blog and the Flink blog
-
https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
- http://flink.apache.org/news/2016/03/08/release-1.0.0.html

Don't forget to retweet and spread the news :-)
- https://twitter.com/TheASF/status/707174116969857024
- https://twitter.com/ApacheFlink/status/707175973482012672

Check out the changelog and the migration guide, download the release, and 
check out the documentation
- http://flink.apache.org/blog/release_1.0.0-changelog_known_issues.html
-
https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x
- https://cwiki.apache.org/confluence/display/FLINK/Stability+Annotations
- http://flink.apache.org/downloads.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.0/

Many congratulations to the Flink community for making this happen!

Best,
Kostas


RE: Off-heap memory in Flink?

2015-07-31 Thread Radu Tudoran
Hi,

Is there some info, description about how this off-heap memory is managed and 
its goals?
Thanks

Dr. Radu Tudoran
Research Engineer
IT RD Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN

-Original Message-
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Friday, July 31, 2015 4:02 PM
To: dev@flink.apache.org
Subject: Re: Off-heap memory in Flink?

The Pull Request is basically ready. I would like to benchmark a bit before 
merging it.

The on-heap Flink-managed memory classes are highly optimized to be JIT 
friendly. Just want to make sure that we don't loose that.

I have worked a lot on streaming issues lately, so this is still in my backlog.

On Fri, Jul 31, 2015 at 2:09 PM, Maximilian Michels m...@apache.org wrote:

 Hi Slim,

 Off-heap memory has been postponed because it's not a pressing but 
 rather a nice-to-have feature. I know that Stephan continued to work 
 on the off-heap memory. I think we can get it in sometime this year.

 Best,
 Max

 On Fri, Jul 31, 2015 at 11:57 AM, Slim Baltagi sbalt...@gmail.com wrote:

  Hi
 
  I remember seeing that using off-heap memory was on Flink’s roadmap 
  as
 well
  as a related pull request https://github.com/apache/flink/pull/290
 
  Any update on such effort?
 
  Thanks
 
  Slim Baltagi
 
 
 
  --
  View this message in context:
 
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Off-hea
 p-memory-in-Flink-tp7285.html
  Sent from the Apache Flink Mailing List archive. mailing list 
  archive at Nabble.com.