[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1020: PARQUET-2226 Support merge bloom filters

2023-01-15 Thread GitBox


wgtmac commented on code in PR #1020:
URL: https://github.com/apache/parquet-mr/pull/1020#discussion_r1070853151


##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -394,4 +395,24 @@ public long hash(float value) {
   public long hash(Binary value) {
 return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public void merge(BloomFilter otherBloomFilter) throws IOException {
+Preconditions.checkArgument(otherBloomFilter != null, "Cannot merge a null 
BloomFilter");
+Preconditions.checkArgument((getAlgorithm() == 
otherBloomFilter.getAlgorithm()),
+  "BloomFilters must have the same algorithm (%s != %s)",
+getAlgorithm(), otherBloomFilter.getAlgorithm());
+Preconditions.checkArgument((getHashStrategy() == 
otherBloomFilter.getHashStrategy()),
+  "BloomFilters must have the same hashStrategy (%s != %s)",
+getHashStrategy(), otherBloomFilter.getHashStrategy());
+Preconditions.checkArgument((getBitsetSize() == 
otherBloomFilter.getBitsetSize()),

Review Comment:
   Maybe we should put this check to the 1st place to make it fail fast. 
Usually parameters above do not change.



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -394,4 +395,24 @@ public long hash(float value) {
   public long hash(Binary value) {
 return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public void merge(BloomFilter otherBloomFilter) throws IOException {

Review Comment:
   I still think it is helpful to extract the check logic to a separate method 
like `boolean canMergeFrom(BloomFilter other)` because user has to check these 
parameters anyway before calling the `merge` method. Then `merge` 
implementation can simply call `canMergeFrom`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1020: PARQUET-2226 Support merge bloom filters

2023-01-15 Thread GitBox


wgtmac commented on code in PR #1020:
URL: https://github.com/apache/parquet-mr/pull/1020#discussion_r1070591970


##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -398,18 +398,21 @@ public long hash(Binary value) {
 
   @Override
   public void merge(BloomFilter otherBloomFilter) throws IOException {
-Preconditions.checkArgument((otherBloomFilter.getAlgorithm() == 
getAlgorithm()),
-  "BloomFilter algorithm should be same");
-Preconditions.checkArgument((otherBloomFilter.getHashStrategy() == 
getHashStrategy()),
-  "BloomFilter hashStrategy should be same");
-Preconditions.checkArgument((otherBloomFilter.getBitsetSize() == 
getBitsetSize()),
-  "BloomFilter bitset size should be same");
+Preconditions.checkArgument(otherBloomFilter != null, "Cannot merge a null 
BloomFilter");
+Preconditions.checkArgument((getAlgorithm() == 
otherBloomFilter.getAlgorithm()),

Review Comment:
   nit: you don't have to write String.format explicitly
   ```
   Preconditions.checkArgument((getAlgorithm() == 
otherBloomFilter.getAlgorithm()),
   "BloomFilters must have the same algorithm (%s != %s)",
   getAlgorithm(), otherBloomFilter.getAlgorithm());
   ```



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -394,4 +395,24 @@ public long hash(float value) {
   public long hash(Binary value) {
 return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public void merge(BloomFilter otherBloomFilter) throws IOException {
+Preconditions.checkArgument(otherBloomFilter != null, "Cannot merge a null 
BloomFilter");
+Preconditions.checkArgument((getAlgorithm() == 
otherBloomFilter.getAlgorithm()),
+  String.format("BloomFilters must have the same algorithm (%s != %s)",
+getAlgorithm(), otherBloomFilter.getAlgorithm()));
+Preconditions.checkArgument((getHashStrategy() == 
otherBloomFilter.getHashStrategy()),
+  String.format("BloomFilters must have the same hashStrategy (%s != %s)",
+getHashStrategy(), otherBloomFilter.getHashStrategy()));
+Preconditions.checkArgument((getBitsetSize() == 
otherBloomFilter.getBitsetSize()),
+  String.format("BloomFilters must have the same size of bitsets (%s != 
%s)",
+getBitsetSize(), otherBloomFilter.getBitsetSize()));
+ByteArrayOutputStream otherOutputStream = new ByteArrayOutputStream();
+otherBloomFilter.writeTo(otherOutputStream);
+byte[] otherBits = otherOutputStream.toByteArray();

Review Comment:
   should we check the equality of bitset.length and otherBits.length?



##
parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java:
##
@@ -181,6 +182,60 @@ public void testBloomFilterNDVs(){
 assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testMergeBloomFilter() throws IOException {
+Random random = new Random();
+int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 1024, 0.01) / 
8;
+BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
+BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+
+Set originStrings = new HashSet<>();
+Set testStrings = new HashSet<>();
+Set testInts = new HashSet<>();
+Set testDoubles = new HashSet<>();
+Set testFloats = new HashSet<>();
+for (int i = 0; i < 1024; i++) {
+
+  String originStrValue = RandomStringUtils.randomAlphabetic(1, 64);
+  originStrings.add(originStrValue);
+  
mergedBloomFilter.insertHash(otherBloomFilter.hash(Binary.fromString(originStrValue)));
+
+  String testString = RandomStringUtils.randomAlphabetic(1, 64);
+  testStrings.add(testString);
+  
otherBloomFilter.insertHash(otherBloomFilter.hash(Binary.fromString(testString)));
+
+  int testInt = random.nextInt();
+  testInts.add(testInt);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testInt));
+
+  double testDouble = random.nextDouble();
+  testDoubles.add(testDouble);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testDouble));
+
+  float testFloat = random.nextFloat();
+  testFloats.add(testFloat);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testFloat));
+}
+mergedBloomFilter.merge(otherBloomFilter);
+for (String testString : originStrings) {
+  
assertTrue(mergedBloomFilter.findHash(mergedBloomFilter.hash(Binary.fromString(testString;

Review Comment:
   In this case, I'd suggest not use random values.



##
parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java:
##
@@ -181,6 +182,60 @@ public void testBloomFilterNDVs(){
 assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testMergeBloomFilter() throws IOException {
+Random random = 

[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1020: PARQUET-2226 Support merge bloom filters

2023-01-15 Thread GitBox


wgtmac commented on code in PR #1020:
URL: https://github.com/apache/parquet-mr/pull/1020#discussion_r1070546933


##
parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java:
##
@@ -181,6 +182,60 @@ public void testBloomFilterNDVs(){
 assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testMergeBloomFilter() throws IOException {
+Random random = new Random();
+int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 1024, 0.01) / 
8;
+BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
+BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+
+Set originStrings = new HashSet<>();
+Set testStrings = new HashSet<>();
+Set testInts = new HashSet<>();
+Set testDoubles = new HashSet<>();
+Set testFloats = new HashSet<>();
+for (int i = 0; i < 1024; i++) {
+
+  String originStrValue = RandomStringUtils.randomAlphabetic(1, 64);
+  originStrings.add(originStrValue);
+  
mergedBloomFilter.insertHash(otherBloomFilter.hash(Binary.fromString(originStrValue)));
+
+  String testString = RandomStringUtils.randomAlphabetic(1, 64);
+  testStrings.add(testString);
+  
otherBloomFilter.insertHash(otherBloomFilter.hash(Binary.fromString(testString)));
+
+  int testInt = random.nextInt();
+  testInts.add(testInt);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testInt));
+
+  double testDouble = random.nextDouble();
+  testDoubles.add(testDouble);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testDouble));
+
+  float testFloat = random.nextFloat();
+  testFloats.add(testFloat);
+  otherBloomFilter.insertHash(otherBloomFilter.hash(testFloat));
+}
+mergedBloomFilter.merge(otherBloomFilter);
+for (String testString : originStrings) {
+  
assertTrue(mergedBloomFilter.findHash(mergedBloomFilter.hash(Binary.fromString(testString;

Review Comment:
   Can you add some cases like below to have more coverage:
   - value that is tested false on the original BF but true on the other side, 
and vice versa?
   - value that is tested false on both sides.
   - value that is tested true on both sides.



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -394,4 +395,21 @@ public long hash(float value) {
   public long hash(Binary value) {
 return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public void merge(BloomFilter otherBloomFilter) throws IOException {
+Preconditions.checkArgument((otherBloomFilter.getAlgorithm() == 
getAlgorithm()),

Review Comment:
   It would be more user-friendly to print the mismatched values on both sides.



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java:
##
@@ -176,4 +176,13 @@ public String toString() {
* @return compress algorithm that the bloom filter apply
*/
   Compression getCompression();
+
+  /**
+   * Combines this Bloom filter with another Bloom filter by performing a 
bitwise OR of the underlying bits
+   *
+   * @param otherBloomFilter The Bloom filter to combine this Bloom filter 
with.
+   */
+  default void merge(BloomFilter otherBloomFilter) throws IOException {
+throw new UnsupportedOperationException("Not supported merge operation.");

Review Comment:
   ```suggestion
   throw new UnsupportedOperationException("Merge is not implemented.");
   ```



##
parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java:
##
@@ -181,6 +182,60 @@ public void testBloomFilterNDVs(){
 assertTrue(bytes < 5 * 1024 * 1024);
   }
 
+  @Test
+  public void testMergeBloomFilter() throws IOException {
+Random random = new Random();
+int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 1024, 0.01) / 
8;
+BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
+BloomFilter mergedBloomFilter = new BlockSplitBloomFilter(numBytes);
+
+Set originStrings = new HashSet<>();

Review Comment:
   Better to split different types into separate test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1020: PARQUET-2226 Support merge bloom filters

2023-01-14 Thread GitBox


wgtmac commented on code in PR #1020:
URL: https://github.com/apache/parquet-mr/pull/1020#discussion_r1070233210


##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java:
##
@@ -394,4 +395,21 @@ public long hash(float value) {
   public long hash(Binary value) {
 return hashFunction.hashBytes(value.getBytes());
   }
+
+  @Override
+  public void putAll(BloomFilter otherBloomFilter) throws IOException {

Review Comment:
   Could you add some tests to verify if the merged BF is as expected?



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java:
##
@@ -176,4 +176,10 @@ public String toString() {
* @return compress algorithm that the bloom filter apply
*/
   Compression getCompression();
+
+  /**
+   * Combines this Bloom filter with another Bloom filter by performing a 
bitwise OR of the underlying data
+   * @param otherBloomFilter The Bloom filter to combine this Bloom filter 
with.
+   */
+  void putAll(BloomFilter otherBloomFilter) throws IOException;

Review Comment:
   IMHO, user needs to know if two BFs are compatible to merge before calling 
this function. So an utility function to test compatibility of two BFs is also 
required. WDYT?



##
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java:
##
@@ -176,4 +176,10 @@ public String toString() {
* @return compress algorithm that the bloom filter apply
*/
   Compression getCompression();
+
+  /**
+   * Combines this Bloom filter with another Bloom filter by performing a 
bitwise OR of the underlying data
+   * @param otherBloomFilter The Bloom filter to combine this Bloom filter 
with.
+   */
+  void putAll(BloomFilter otherBloomFilter) throws IOException;

Review Comment:
   Rename to `merge`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org