[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386365#comment-16386365
 ] 

ASF GitHub Bot commented on FLINK-8601:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5641
  
For the sake of easy to discussion later..


> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16386361#comment-16386361
 ] 

ASF GitHub Bot commented on FLINK-8601:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5641

[FLINK-8601][WIP] Introduce PartitionedBloomFilter for Approximate 
calculation and other situations of performance optimization

This PR introduce PartitionedBloomFilter which support rescaling and can 
deal with data skew problem
 properly.

## Brief change log

- introduce PartitionedBloomFilter for Approximate calculation and other 
situations of performance optimization.

## Verifying this change

This change can be verified by the unit tests in below files:
- PartitionedBloomFilterTest.java
- LinkedBloomFilterTest.java
- LinkedBloomFilterNodeTest.java
- PartitionedBloomFilterManagerTest.java

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
 doc: [google 
doc](https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink bloomfilter_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5641


commit 5429abe0031a93596b12dada6e9696f3179eb4e8
Author: summerleafs 
Date:   2018-02-06T16:47:25Z

introduce bloom filter state.

commit 2d1f66c10fbf74272be76283b909b290ae55d4fd
Author: summerleafs 
Date:   2018-02-07T14:52:22Z

add unit tests for bloom filter state.

commit 433370a12814f7bd80127d4508e1dd0812a9d3fe
Author: summerleafs 
Date:   2018-02-07T18:12:13Z

add general type support.

commit 5e05ee84353516fe7ff6eb7dd3a01dfdb3337bc5
Author: summerleafs 
Date:   2018-02-09T15:10:11Z

this is a tmp commit.

commit 6e4ff0cebed853c598e0647e9f8aa56b5b59d0cc
Author: summerleafs 
Date:   2018-02-10T14:30:13Z

this is a tmp commit.

commit aa672e6e1e89b185722fde44a9b4044b87010c99
Author: summerleafs 
Date:   2018-02-10T15:32:01Z

this is a tmp commit.

commit 3b04502ba277cad2a7b0bc381fb192d18b56f17d
Author: summerleafs 
Date:   2018-02-11T11:34:54Z

fix build.

commit 775d6aaf354de35c7ddff242f8e006e13e9a0e76
Author: summerleafs 
Date:   2018-02-12T03:52:43Z

add annotation for classes.

commit b7f04303aa1ec1fbe9696bb58b13838b6a74a7ae
Author: summerleafs 
Date:   2018-02-12T03:53:19Z

a temp commit.

commit 28222bf5fc352a26082f2aee19be70ca5f9aa9d9
Author: sihuazhou 
Date:   2018-03-05T16:48:15Z

fix build.




> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on 

[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377152#comment-16377152
 ] 

Aljoscha Krettek commented on FLINK-8601:
-

I can have a look at this once the 1.5 release is out, since we're currently 
very busy with that.

> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-22 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373978#comment-16373978
 ] 

Sihua Zhou commented on FLINK-8601:
---

[~aljoscha] Do you have any advice on this? How about I submit a PR for this 
quickly? Hope you guys could comment on it and leave your precious comments.

> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8601) Introduce PartitionedBloomFilter for Approximate calculation and other situations of performance optimization

2018-02-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358222#comment-16358222
 ] 

Fabian Hueske commented on FLINK-8601:
--

Thanks for the detailed design document [~sihuazhou]! 
Now I have a pretty good understanding of the proposal. 

I think this would be a nice feature, but I'm not sure if it is generic enough 
to go into the base classes that are used by all functions or whether it would 
make more sense to expose it only to specific functions like the 
{{ProcessFunction}}.

[~aljoscha], what do you think about the proposal?

> Introduce PartitionedBloomFilter for Approximate calculation and other 
> situations of performance optimization
> -
>
> Key: FLINK-8601
> URL: https://issues.apache.org/jira/browse/FLINK-8601
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> h3. Backgroud
> Bloom filter is useful in many situation, for example:
>  * 1. Approximate calculation: deduplication (eg: UV calculation)
>  * 2. Performance optimization: eg, [runtime filter 
> join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
>By using BF, we can greatly reduce the number of queries for state 
> data in a stream join, and these filtered queries will eventually fail to 
> find any results, which is a poor performance for rocksdb-based state due to 
> traversing ```sst``` on the disk. 
> However, based on the current status provided by flink, it is hard to use the 
> bloom filter for the following reasons:
>  * 1. Serialization problem: Bloom filter status can be large (for example: 
> 100M), if implement it based on the RocksDB state, the state data will need 
> to be serialized each time it is queried and updated, and the performance 
> will be very poor.
>  * 2. Data skewed: Data in different key group can be skewed, and the 
> information of data skewed can not be accurately predicted before the program 
> is running. Therefore, it is impossible to determine how much resources bloom 
> filter should allocate. One way to do this is to allocate space needed for 
> the most skewed case, but this can lead to very serious waste of resources.
> h3. Requirement
> Therefore, I introduce the PartitionedBloomFilter for flink, which at least 
> need to meet the following features:
>  * 1. Support for changing Parallelism
>  * 2. Only serialize when necessary: when performing checkpoint
>  * 3. Can deal with data skew problem: users only need to specify a 
> PartitionedBloomFilter with the desired input, fpp, system will allocate 
> resource dynamic.
>  * 4. Do not conflict with other state: user can use KeyedState and 
> OperateState when using this bloom filter.
>  * 5. Support relax ttl (ie: the data survival time at least greater than the 
> specified time)
> Design doc:  [design 
> doc|https://docs.google.com/document/d/1s8w2dkNFDM9Fb2zoHwHY0hJRrqatAFta42T97nDXmqc/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)