[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-11-18 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8918:
-
Fix Version/s: (was: 1.7.0)

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.8.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found 
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> 
> // add the current key into the filter of left stream.
>   leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  (even though it not for stream join original, but we can easily refer it to 
> `stream join`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-11-18 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8918:
-
Fix Version/s: 1.8.0

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.7.0, 1.8.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found 
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> 
> // add the current key into the filter of left stream.
>   leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  (even though it not for stream join original, but we can easily refer it to 
> `stream join`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-08-06 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8918:
-
Fix Version/s: (was: 1.6.0)
   1.7.0

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the `joined ratio` of the stream join is often very 
> low, for example.
>  - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, payment, collection, retweet) log with the 
> `promotion_id` to analysis the effect of the promotion.
>  - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the `click_id` to find which click of which 
> AD that brings the payment to do attribution.
>  - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
>  - ….so on
> All these cases have one common property, that is the _joined ratio_ is very 
> low. Here is a example to describe it, imagine that, we have 1 records 
> from the left stream, and 1 records from the right stream, and we execute 
> _select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
> 100 record from the result, that is the case for low _joined ratio_, this is 
> an example for inner join, but it can also apply to left & right join.
> there are more example I can come up with low _joined ratio_ , but the most 
> important point I want to expressed is that, the low _joined ratio_ of stream 
> join in production is a very common phenomenon(maybe the almost common 
> phenomenon in some companies, at least in our company that is the case).
> *Then how to improve it?*
> We can see from the above case, 1 record join 1 record we only got 
> 100 result, that means, we query the state 2 times (1 for the left 
> stream and 1 for the right stream) but only 100 of them are meaningful!!! 
> If we could reduce the useless query times, then we can definitely improve 
> the performance of stream join.
> the way we used to improve this is to introduce the _Runtime Filter Join_, 
> the mainly ideal is that, we build a _filter_ for the state on each side 
> (left stream & right stream). When we need to query the state on that side we 
> first check the corresponding _filter_ whether the _key_ is possible in the 
> state, if the _filter_ say "not, it impossible in the state", then we stop 
> querying the state, if it say "hmm, it maybe in state", then we need to query 
> the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, 
> it has all the feature that we expected: _extremely good performance_, 
> _non-existence of false negative_.
>  
> *the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
> based on RocksDBBackend)*
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
> Iterator rightIterator = rigthStreamState.iterator();
> // perform the `seek()` on the RocksDB, and iterator one by one,
> // this is an expensive operation especially when the key can't be found 
> in RocksDB.
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> }
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
> Iterator rightIterator = EMPTY_ITERATOR;
> if (rigthStreamfilter.containsCurrentKey()) {
> rightIterator = rigthStreamState.iterator();
> }
> // perform the `seek()` only when filter.containsCurrentKey() return true
> for (Record recordFromRightState : rightIterator) {
> ...
> }
> 
> // add the current key into the filter of left stream.
>   leftStreamFilter.addCurrentKey();
> }
> {code}
> A description of Runtime Filter Join for batch join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  (even though it not for stream join original, but we can easily refer it to 
> `stream join`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8918:
--
Description: 
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 - stream join in AD(advertising) attribution: Job need to join the AD click 
log with the item payment log on the `click_id` to find which click of which AD 
that brings the payment to do attribution.
 - stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
 - ….so on

All these cases have one common property, that is the _joined ratio_ is very 
low. Here is a example to describe it, imagine that, we have 1 records from 
the left stream, and 1 records from the right stream, and we execute 
_select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
100 record from the result, that is the case for low _joined ratio_, this is an 
example for inner join, but it can also apply to left & right join.

there are more example I can come up with low _joined ratio_ , but the most 
important point I want to expressed is that, the low _joined ratio_ of stream 
join in production is a very common phenomenon(maybe the almost common 
phenomenon in some companies, at least in our company that is the case).

*Then how to improve it?*

We can see from the above case, 1 record join 1 record we only got 100 
result, that means, we query the state 2 times (1 for the left stream 
and 1 for the right stream) but only 100 of them are meaningful!!! If we 
could reduce the useless query times, then we can definitely improve the 
performance of stream join.

the way we used to improve this is to introduce the _Runtime Filter Join_, the 
mainly ideal is that, we build a _filter_ for the state on each side (left 
stream & right stream). When we need to query the state on that side we first 
check the corresponding _filter_ whether the _key_ is possible in the state, if 
the _filter_ say "not, it impossible in the state", then we stop querying the 
state, if it say "hmm, it maybe in state", then we need to query the state. As 
you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the 
feature that we expected: _extremely good performance_, _non-existence of false 
negative_.

 


*the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
based on RocksDBBackend)*
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be found in 
RocksDB.
for (Record recordFromRightState : rightIterator) {
...
}
}

void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
// perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}

// add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

A description of Runtime Filter Join for batch join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`)

  was:
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 

[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-05-15 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8918:
--
Description: 
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 - stream join in AD(advertising) attribution: Job need to join the AD click 
log with the item payment log on the `click_id` to find which click of which AD 
that brings the payment to do attribution.
 - stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
 - ….so on

All these cases have one common property, that is the _joined ratio_ is very 
low. Here is a example to describe it, imagine that, we have 1 records from 
the left stream, and 1 records from the right stream, and we execute 
_select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
100 record from the result, that is the case for low _joined ratio_, this is an 
example for inner join, but it can also apply to left & right join.

there are more example I can come up with low _joined ratio_ , but the most 
important point I want to expressed is that, the low _joined ratio_ of stream 
join in production is a very common phenomenon(maybe the almost common 
phenomenon in some companies, at least in our company that is the case).

*Then how to improve it?*

We can see from the above case, 1 record join 1 record we only got 100 
result, that means, we query the state 2 times (1 for the left stream 
and 1 for the right stream) but only 100 of them are meaningful!!! If we 
could reduce the useless query times, then we can definitely improve the 
performance of stream join.

the way we used to improve this is to introduce the _Runtime Filter Join_, the 
mainly ideal is that, we build a _filter_ for the state on each side (left 
stream & right stream). When we need to query the state on that side we first 
check the corresponding _filter_ whether the _key_ is possible in the state, if 
the _filter_ say "not, it impossible in the state", then we stop querying the 
state, if it say "hmm, it maybe in state", then we need to query the state. As 
you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the 
feature that we expected: _extremely good performance_, _non-existence of false 
negative_.

 

A 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`)

*the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
based on RocksDBBackend)*
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be found in 
RocksDB.
for (Record recordFromRightState : rightIterator) {
...
}
}

void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
// perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}

// add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

A description of Runtime Filter Join for batch join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`)

  was:Right now, for every record that need to be joined, we need to query both 
`left stream's state` and `right stream's state`. I proposal to introduce RF 
join to reduce the `query count` of state, which could improve the performance 
of `stream join`, especially when the joined rate is low. A simple description 
for RF join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, 

[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-03-14 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-8918:
-
Issue Type: Improvement  (was: Bug)

> Introduce Runtime Filter Join
> -
>
> Key: FLINK-8918
> URL: https://issues.apache.org/jira/browse/FLINK-8918
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Right now, for every record that need to be joined, we need to query both 
> `left stream's state` and `right stream's state`. I proposal to introduce RF 
> join to reduce the `query count` of state, which could improve the 
> performance of `stream join`, especially when the joined rate is low. A 
> simple description for RF join can be found 
> [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>  (even though it not for stream join original, but we can easily refer it to 
> `stream join`).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)