Greg Hogan created FLINK-2907:
---------------------------------

             Summary: Bloom filter for Join
                 Key: FLINK-2907
                 URL: https://issues.apache.org/jira/browse/FLINK-2907
             Project: Flink
          Issue Type: New Feature
          Components: Java API, Scala API
    Affects Versions: 1.0
            Reporter: Greg Hogan
            Assignee: Greg Hogan


A bloom filter can be a chainable operation for probe side Join elements. An 
element not matched by the bloom filter will not be serialized, shipped, 
deserialized, and processed.

Generating the bloom filter is a chainable operation over hash side elements. 
The bloom filter created on each TaskManager must be the same size to allow 
combining by xor. The most efficient means to distribute the bloom filter is to 
assign each TaskManager an equal partition that it will receive from all other 
TaskManagers. This will be broadcast once all local elements (by hashing) and 
remote partitions (by xor) have been processed into that part of the bloom 
filter.

An example with numbers: triangle listing/counting joining 2B edges on 149B 
two-paths resulting in 21B triangles (this is using the optimal algorithm). At 
8 bits per element the bloom filter will have a false-positive rate of ~2% and 
require a 2 GB bloom filter (stored once and shared per TaskManager). Each 
TaskManager both sends and receives data equivalent to the size of the bloom 
filter (minus the local partition, the size of which trends towards zero as the 
number of TaskManagers increases). The number of matched elements is 21B (true 
positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 TB (at 12 bytes 
per element). With 4 TaskManagers only 12 GB of bloom filter would be 
transmitted, a savings of 99.2%.

Key issues are determining the size of the bloom filter (dependent on the count 
of hash side elements, the available memory segments, and the error rate) and 
whether this can be integrated with Join or must be a separate operator. This 
also depends on dynamic memory allocation as spilling to disk would perform the 
serialization, write, read, and deserialization we are looking to avoid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to