[ https://issues.apache.org/jira/browse/FLINK-2907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Hogan resolved FLINK-2907. ------------------------------- Resolution: Later Closing this issue with current thoughts: - this would only be applicable for batch since the input must be fully materialized in order to generate and distribute the bloom filter - without slot affinity the bloom filters may not be readily shareable on a TaskManager - even though the bloom filter is a compressed representation of the DataSet, for a large quantity of data the partitioned DataSet may fit in memory whereas the bloom filter may not - without a priori knowledge of the size of the DataSet either a large number of memory segments must be reserved or multiple bloom filters created with increasing size - the tradeoff with a bloom filter is additional lookups in memory but less shuffling of data; it's not clear that these lookups (which for even a most data set will not be in CPU cache) are more efficient than (de)serializing elements to/from network buffers (multiple bloom filters exacerbates this problem) I still think this would be very interesting to research but there are many other performance improvements I would like to prioritize. > Bloom filter for Join > --------------------- > > Key: FLINK-2907 > URL: https://issues.apache.org/jira/browse/FLINK-2907 > Project: Flink > Issue Type: New Feature > Components: DataSet API > Affects Versions: 1.0.0 > Reporter: Greg Hogan > Assignee: Greg Hogan > Labels: requires-design-doc > > A bloom filter can be a chainable operation for probe side Join elements. An > element not matched by the bloom filter will not be serialized, shipped, > deserialized, and processed. > Generating the bloom filter is a chainable operation over hash side elements. > The bloom filter created on each TaskManager must be the same size to allow > combining by xor. The most efficient means to distribute the bloom filter is > to assign each TaskManager an equal partition that it will receive from all > other TaskManagers. This will be broadcast once all local elements (by > hashing) and remote partitions (by xor) have been processed into that part of > the bloom filter. > An example with numbers: triangle listing/counting joining 2B edges on 149B > two-paths resulting in 21B triangles (this is using the optimal algorithm). > At 8 bits per element the bloom filter will have a false-positive rate of ~2% > and require a 2 GB bloom filter (stored once and shared per TaskManager). > Each TaskManager both sends and receives data equivalent to the size of the > bloom filter (minus the local partition, the size of which trends towards > zero as the number of TaskManagers increases). The number of matched elements > is 21B (true positive) + ~0.02*(149B-21B) = 23.5B, a reduction of 84% or 1.5 > TB (at 12 bytes per element). With 4 TaskManagers only 12 GB of bloom filter > would be transmitted, a savings of 99.2%. > Key issues are determining the size of the bloom filter (dependent on the > count of hash side elements, the available memory segments, and the error > rate) and whether this can be integrated with Join or must be a separate > operator. This also depends on dynamic memory allocation as spilling to disk > would perform the serialization, write, read, and deserialization we are > looking to avoid. -- This message was sent by Atlassian JIRA (v6.3.4#6332)