Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-22 Thread Pieter Bonte
Hi Till,

Thanks for the feedback.

My use case is a little bit more tricky as I can’t key all the streams by the 
same field. 
Basically I’m trying to solve Continuous SPARQL queries, which consist of many 
joins. I’ve seen that SPARQL queries over RDF data has been discussed before on 
the mailing list, however, not for RDF streams that are only valid with a 
certain time window.
To give you an example of a simple query, which looks for 'Persons that are 
sitting next to Students':
Select * WHERE{
?x a Person.
?x nextTo ?y.
?y a Student.
}
So everything that matches ‘?x a Person’ could be my A Stream, ‘?x nextTo ?y’ a 
B Stream and ‘?y a Student’ a C Stream.
So for the first join, '?x a Person’ and '?x nextTo ?y’ need to be joined on 
variable ?x ,i.e. the first field of the A and B stream, while  '?x nextTo ?y’ 
and '?y a Student’ need to be joined on variable ?y, i.e. the second field of 
the B stream and first field of the C stream.

As I can’t key the streams before windowing, I tried to combine the streams 
together, window them and assign the window end time to each event. Then I 
separated the streams again and joined them using a CoProcessFunction. Based on 
the window end time, I know which events should be joined as they are contained 
in the same window. I thought that I could store the events of both streams and 
the last seen window time end time time stamp. If and event arrives with a 
larger window end time, I could join the previous seen events from both streams 
and clear them.
However, I see that the events arrive out of order in my CoProcessFunction, 
requiring me to store the content of various windows. I was a little bit 
surprised by this behaviour, but perhaps its because I’m using a fixed dataset 
for easy testing? The idea then was to store the content of multiple windows 
and use the progression of the watermark to know which windows could be 
cleared, so I don’t need to store the content of all possible previous windows. 
However,  it does not seem to be possible to combine a MapState with a list. 
The map state would contain the end time of each window as key, and  a list 
with previously seen content of that window as value. 
I’m guessing that there are more elegant and easier ways to solve this?

For the table API, I was able to find a solution as well. I first combine the 
streams, window them and again assign the window end time as time stamp of each 
event. I split the streams and convert them to tables. As the window end times 
are assigned, I can use these to window the data using intervals, e.g. ‘A.ts 
BETWEEN B.ts and B.ts’. This solution works, and it is easier to translate the 
SPARQL query to SQL. However, the program does not garbage collect the content 
of the streams that is out dated, as a window using the DataStream API would. I 
see that my flink program keeps growing in size. Is there a translation of the 
table api windows  to DataStream windows?
Should I use  the ‘setIdleStateRetentionTime’ configuration function, to remove 
state?

Thanks in advance!

Kind regards,
Pieter

-
Dr. Ir. Pieter Bonte
Ghent University - imec 
IDLab 
iGent Tower - Department of Information Technology 
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium 
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 
F: +32 9 33 14899 
E: pieter.bo...@ugent.be 
W: IDLab.technology 
W: IDLab.ugent.be 

> On 17 Feb 2021, at 10:00, Till Rohrmann  wrote:
> 
> Hi Pieter,
> 
> from the top of my head, I think the easiest way to solve this problem is to 
> implement your own "window join" operation by first unioning all three 
> streams and then applying a ProcessWindowFunction similar to
> 
> allEvents.keyBy((KeySelector) value -> 
> value).window(SlidingEventTimeWindows.of(Time.seconds(10), 
> Time.seconds(5))).process(
>   new ProcessWindowFunction() {
>   @Override
>   public void process(
>   Tuple tuple,
>   Context context,
>   Iterable elements,
>   Collector out) throws Exception {
>   // compute join result from elements
>   }
>   });
> 
> @Timo is there an easier way using Flink's SQL or Table API?
> 
> Cheers,
> Till
> 
> On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte  <mailto:pieter.bo...@ugent.be>> wrote:
> Hi all,
> 
> I’m trying to apply a window operator over multiple streams (more than 2) and 
> join these streams within the validity of the window. However, I have some 
> questions about the time semantics using both the DataStream API and the 
> Table API/SQL.
> 
> Lets say we have 3 streams, an A, B and C stream. And currently we have an 
> A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
> We would like to join these streams when they fall within a sliding window of 
> size 10 and slide 5.

Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-16 Thread Pieter Bonte
Hi all,

I’m trying to apply a window operator over multiple streams (more than 2) and 
join these streams within the validity of the window. However, I have some 
questions about the time semantics using both the DataStream API and the Table 
API/SQL.

Lets say we have 3 streams, an A, B and C stream. And currently we have an A@0 
(an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
We would like to join these streams when they fall within a sliding window of 
size 10 and slide 5.
Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
The second window W2[5,15) should contain B@5, C@5 and C@13.
So only in the first window we could successfully join all 3 streams.

However, I’m not able to mimic this behaviour using the DataStream or Table API.


Using the DataStream API, joining multiple streams can be achieved by applying 
a first window and join stream A and stream B and then apply a second window to 
join the result of the previous window with stream C, e.g.:

streamA
  .join(streamB)
.where().equalTo()
.window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) 
//<-Window Wab
.apply (new JoinFunction () {...})
  .join(streamC)
.where().equalTo()
.window(SlidingEventTimeWindows(Time.seconds(10),Time.seconds(5))) 
//<-Window Wabc
.apply (new JoinFunction () {…})

However, according to the documentation on Window Joins [1] (and debugging), 
the joined events from the first window (Wab) will be assigned a new timestamp 
that is the largest timestamp that still lies in the respective window, i.e. 
the time the window closes. 
Thus the result of joining A@0 and B@5 over the first window (Wab) will be 
AB@9. When joining with the C-stream, AB@9 can be joined with both C@5 and 
C@13. Which is not the behaviour I would like to obtain, since A happend at 
timestamp 0, and C@13 is more than 10 timestamps away.

Using the Table API or SQL, I think this can be solved using Interval Joins 
[2]. However, it seems like the windowing semantics are different as you need 
to define one table(or stream) around which you want to apply a interval. 
Depending on the choice of table on which the interval is applied, different 
results can be obtained. For example, lets say we have 3 table versions of our 
previous streams, i.e. A, B and C, each with a time attribute ’ts’.
Applying an interval around table A would result in something like:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
A.ts BETWEEN B.ts - INTERVAL ‘5' MINUTE AND B.ts + INTERVAL ‘5' MINUTE 
AND
A.ts BETWEEN C.ts - INTERVAL ‘5' MINUTE AND C.ts + INTERVAL ‘5' MINUTE

So if we want a window of 10, I think we split the interval in 5 minutes before 
and after? However, now A@0 is not in the interval of C@6. Applying a interval 
of 10 would solve this problem, However  if we would apply an interval of 10 
both before and after, but chose to fix the interval around B instead, we run 
into a different problem:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
B.ts BETWEEN A.ts - INTERVAL ‘10' MINUTE AND A.ts + INTERVAL ‘10' 
MINUTE AND
B.ts BETWEEN C.ts - INTERVAL ‘10' MINUTE AND C.ts + INTERVAL ‘10’ MINUTE
In this case B@5 is in the interval of A@0 but also of C@13.

So my question is how can I join multiple streams within a window that would 
represent the behaviour as all the streams were joined in the same window? 
Should I write my own WindowOperator that assigns the smallest timestamp when 
two events can be joined instead of the time that the window closes?

Thanks in advance!

Kind regards,
Pieter

// code examples taken from [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[3] https://stackoverflow.com/a/50879029
-
Dr. Ir. Pieter Bonte
Ghent University - imec 
IDLab 
iGent Tower - Department of Information Technology 
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium 
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 
F: +32 9 33 14899 
E: pieter.bo...@ugent.be 
W: IDLab.technology 
W: IDLab.ugent.be