Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2025-01-08 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2577713757

   > > I have retested the performance of tpcds 10s using the previously 
mentioned testing method. Please see the description part for the latest results
   > 
   > cool, what about tpcds 100 sf ?
   
   Please see the description part for the results of tpcds 100s.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2025-01-05 Thread via GitHub


yjhjstz commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2572320826

   > I have retested the performance of tpcds 10s using the previously 
mentioned testing method. Please see the description part for the latest results
   
   cool, what about tpcds 100 sf ?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2025-01-05 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2572299954

   > > from tpcds 10s details table, there are some bad cases.
   > 
   > **21,24,30,42,49,54,68-1,99**
   > 
   > I retested these SQL statements that exhibited performance regression, and 
the latest test results show no noticeable performance difference when toggling 
**gp_enable_runtime_filter_pushdown**. So, I speculate that the performance 
regression in these SQL statements might be associated with testing method. 
Previously I tested by running the entire suite of 99 TPC-DS queries with 
**gp_enable_runtime_filter_pushdown** enabled, and then again with it disabled.
   > 
   > Therefore, a more appropriate method would be to execute the same SQL 
statement multiple times with **gp_enable_runtime_filter_pushdown** both 
enabled and disabled, respectively, and then take the average of those runs for 
comparison. I will follow this testing method for retesting and observe if 
there's any performance regression.
   
   I have retested the performance of tpcds 10s using the previously mentioned 
testing method. Please see the description part for the latest results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2025-01-01 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2567252216

   > from tpcds 10s details table, there are some bad cases.
   
   **21,24,30,42,49,54,68-1,99**
   
   I retested these SQL statements that exhibited performance regression, and 
the latest test results show no noticeable performance difference when toggling 
**gp_enable_runtime_filter_pushdown**. So, I speculate that the performance 
regression in these SQL statements might be associated with testing method. 
Previously I tested by running the entire suite of 99 TPC-DS queries with 
**gp_enable_runtime_filter_pushdown** enabled, and then again with it disabled.
   
   Therefore, a more appropriate method would be to execute the same SQL 
statement multiple times with **gp_enable_runtime_filter_pushdown** both 
enabled and disabled, respectively, and then take the average of those runs for 
comparison. I will follow this testing method for retesting and observe if 
there's any performance regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-19 Thread via GitHub


yjhjstz commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2556016039

   from tpcds 10s details table, there are some bad cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-15 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1885681270


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,14 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (node->filter_in_seqscan && node->filters &&

Review Comment:
   good idea! I fix in 
https://github.com/apache/cloudberry/pull/724/commits/bcf93e66f72a35db0ab5694a92c59c768423b1ab



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-11 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1879796311


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,14 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (node->filter_in_seqscan && node->filters &&

Review Comment:
   ```c++
   if (!node->filter_in_seqscan ||  !node->filters)
   {
if (table_scan_getnextslot(scandesc, direction, slot))
return slot;
   }
   else 
   {
while (table_scan_getnextslot(scandesc, direction, slot))
{
   .
}
   }
   ```
   
   this make origin path more efficient and readable ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-10 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   > The test case should cover that the hash join node or result node is the 
child of the parent hash join.
   
   fix in 
https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a
 and 
https://github.com/apache/cloudberry/pull/724/commits/768f6230d051580f9f16f0e2e1d9395b3d2faacf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-10 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   > The test case should cover that the hash join node or result node is the 
child of the parent hash join.
   
   add test cases in 
https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a
 and 
https://github.com/apache/cloudberry/pull/724/commits/768f6230d051580f9f16f0e2e1d9395b3d2faacf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-10 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   > The test case should cover that the hash join node or result node is the 
child of the parent hash join.
   
   fix in 
https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-10 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2530888254

   > There are codes changed in MultiExecParallelHash, please add some parallel 
tests with runtime filter.
   
   fix it in 
https://github.com/apache/cloudberry/pull/724/commits/7ab040ae178e9bb616bd75e0488aa6a2293d4183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-07 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2525105120

   > Hi @zhangyue-hashdata I see that previous runtime filter implementation 
relies on some cost model at try_runtime_filter(). Do I understand it 
correctly, that this PR does not do any cost evaluation? Also for TPC-H/TPC-DS 
can you provide results for each query separately?
   > 
   > Asking mostly out of curiosity, I see here are quite a few reviewers here 
already :)
   
   Basically, you're correct. Because our goal is to filter out as much data as 
possible right at the point of data generation. However, this will lead to very 
complex evaluations, so we only made a simple estimation based on rows and work 
memory when creating the Bloom filter.
   Furthermore, I have placed the detailed test results for TPC-DS 10s in PR 
description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-06 Thread via GitHub


fanfuxiaoran commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1872817255


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;
+
+   if (node->filter_in_seqscan && node->filters &&
+   !PassByBloomFilter(node, slot))

Review Comment:
   > It determines whether using a Bloom filter for filtering data would be 
effective based on this evaluation
   
   That makes sense, but where is related code, I just didn't see them in this 
pr. 
   Does it compares the number of rows between the output of hashtable and data 
in the probe table? If the rows of the hashtable are far less than that of the 
probe table , then use the runtime filter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-05 Thread via GitHub


Smyatkin-Maxim commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520691766

   Hi @zhangyue-hashdata 
   I see that previous runtime filter implementation relies on some cost model 
at try_runtime_filter(). Do I understand it correctly, that this PR does not do 
any cost evaluation?
   Also for TPC-H/TPC-DS can you provide results for each query separately?
   
   Asking mostly out of curiosity, I see here are quite a few reviewers here 
already :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-05 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520535244

   > ```
   >  explain analyze
   > SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ;
   >   QUERY PLAN
   > 
   > 
-
   > --
   >  Finalize Aggregate  (cost=1700.07..1700.08 rows=1 width=8) (actual 
time=32119.566..32119
   > .571 rows=1 loops=1)
   >->  Gather Motion 3:1  (slice1; segments: 3)  (cost=1700.02..1700.07 
rows=3 width=8) (
   > actual time=30.967..32119.550 rows=3 loops=1)
   >  ->  Partial Aggregate  (cost=1700.02..1700.03 rows=1 width=8) 
(actual time=32119
   > .131..32119.135 rows=1 loops=1)
   >->  Hash Join  (cost=771.01..1616.68 rows=4 width=4) 
(actual time=14.0
   > 59..32116.962 rows=33462 loops=1)
   >  Hash Cond: (t3.c1 = t1.c1)
   >  Extra Text: (seg0)   Hash chain length 1.0 avg, 3 
max, using 32439 o
   > f 524288 buckets.
   >  ->  Seq Scan on t3  (cost=0.00..387.34 rows=4 
width=4) (actual t
   > ime=0.028..32089.490 rows=33462 loops=1)
   >  ->  Hash  (cost=354.34..354.34 rows=4 width=8) 
(actual time=13.2
   > 57..13.259 rows=33462 loops=1)
   >Buckets: 524288  Batches: 1  Memory Usage: 
5404kB
   >->  Seq Scan on t1  (cost=0.00..354.34 
rows=4 width=8) (ac
   > tual time=0.180..4.877 rows=33462 loops=1)
   >  Planning Time: 0.227 ms
   > ```
   > 
   > runtime_filter has been pushed down to t3 table seqscan, but 'explain 
analyze' doesn't print them out.
   > 
   > ```
   > \d t1
   >  Table "public.t1"
   >  Column |  Type   | Collation | Nullable | Default
   > +-+---+--+-
   >  c1 | integer |   |  |
   >  c2 | integer |   |  |
   >  c3 | integer |   |  |
   >  c4 | integer |   |  |
   >  c5 | integer |   |  |
   > Checksum: t
   > Indexes:
   > "t1_c2" btree (c2)
   > Distributed by: (c1)
   > ```
   > 
   > ```
   >  \d t3
   >  Table "public.t3"
   >  Column |  Type   | Collation | Nullable | Default
   > +-+---+--+-
   >  c1 | integer |   |  |
   >  c2 | integer |   |  |
   >  c3 | integer |   |  |
   >  c4 | integer |   |  |
   >  c5 | integer |   |  |
   > Distributed by: (c1)
   > ```
   
   Thanks for your test case. Based on these, I rewrote code to ensure that 
debug info are always displayed even when the number of filtered rows is zero. 
And add the test case into gp_runtime_filter.sql too.
   fix in 
https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-05 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1871535784


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/

Review Comment:
   Add more message to explain why just inner, right and semi join are allowed 
with runtime filter.
   fix it in 
https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-05 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520542895

   > Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will 
get a crash:
   > 
   > ```sql
   > gpadmin=# show gp_enable_runtime_filter_pushdown;
   >  gp_enable_runtime_filter_pushdown
   > ---
   >  on
   > (1 row)
   > ```
   > 
   > ```sql
   > CREATE TABLE test_tablesample (dist int, id int, name text) WITH 
(fillfactor=10) DISTRIBUTED BY (dist);
   > -- use fillfactor so we don't have to load too much data to get multiple 
pages
   > 
   > -- Changed the column length in order to match the expected results based 
on relation's blocksz
   > INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM 
generate_series(0, 9) s(i) ORDER BY i;
   > INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM 
generate_series(10, 19) s(i) ORDER BY i;
   > INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM 
generate_series(20, 29) s(i) ORDER BY i;
   > EXPLAIN (COSTS OFF)
   >   SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
   > FATAL:  Unexpected internal error (assert.c:48)
   > DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: 
"explain.c", Line: 4154)
   > server closed the connection unexpectedly
   > This probably means the server terminated abnormally
   > before or while processing the request.
   > The connection to the server was lost. Attempting reset: Succeeded.
   > psql (14.4, server 14.4)
   > ```
   > 
   > https://private-user-images.githubusercontent.com/17311022/392676960-a8882208-1800-4df3-9f2d-7c659e24aaa3.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzM0MTAzMTUsIm5iZiI6MTczMzQxMDAxNSwicGF0aCI6Ii8xNzMxMTAyMi8zOTI2NzY5NjAtYTg4ODIyMDgtMTgwMC00ZGYzLTlmMmQtN2M2NTllMjRhYWEzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDEyMDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQxMjA1VDE0NDY1NVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWRhYzExMjYyNmM3MTllZGExNWJhMmZhNjRiZjYwODg4MmY3YTVlMjcyZjYxMjg2YzVkZmM1MmY4OTNlZWQxNTgmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.inCY4gvv-RdOzeQPxitJ7obOz2FFpSMeYCgWzrERhS0";>
   
   hanks for your test case. I fix it in 
https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00
 And add the test case into gp_runtime_filter.sql too.
   fix in 
https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


fanfuxiaoran commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519492174

   ```
explain analyze
   SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ;
 QUERY PLAN
   
   
-
   --
Finalize Aggregate  (cost=1700.07..1700.08 rows=1 width=8) (actual 
time=32119.566..32119
   .571 rows=1 loops=1)
  ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=1700.02..1700.07 
rows=3 width=8) (
   actual time=30.967..32119.550 rows=3 loops=1)
->  Partial Aggregate  (cost=1700.02..1700.03 rows=1 width=8) 
(actual time=32119
   .131..32119.135 rows=1 loops=1)
  ->  Hash Join  (cost=771.01..1616.68 rows=4 width=4) 
(actual time=14.0
   59..32116.962 rows=33462 loops=1)
Hash Cond: (t3.c1 = t1.c1)
Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, 
using 32439 o
   f 524288 buckets.
->  Seq Scan on t3  (cost=0.00..387.34 rows=4 
width=4) (actual t
   ime=0.028..32089.490 rows=33462 loops=1)
->  Hash  (cost=354.34..354.34 rows=4 width=8) 
(actual time=13.2
   57..13.259 rows=33462 loops=1)
  Buckets: 524288  Batches: 1  Memory Usage: 5404kB
  ->  Seq Scan on t1  (cost=0.00..354.34 rows=4 
width=8) (ac
   tual time=0.180..4.877 rows=33462 loops=1)
Planning Time: 0.227 ms
   ```
   runtime_filter has been pushed down to t3 table seqscan, but 'explain 
analyze' doesn't print them out.
   
   ```
   \d t1
Table "public.t1"
Column |  Type   | Collation | Nullable | Default
   +-+---+--+-
c1 | integer |   |  |
c2 | integer |   |  |
c3 | integer |   |  |
c4 | integer |   |  |
c5 | integer |   |  |
   Checksum: t
   Indexes:
   "t1_c2" btree (c2)
   Distributed by: (c1)
   ```
   ```
\d t3
Table "public.t3"
Column |  Type   | Collation | Nullable | Default
   +-+---+--+-
c1 | integer |   |  |
c2 | integer |   |  |
c3 | integer |   |  |
c4 | integer |   |  |
c5 | integer |   |  |
   Distributed by: (c1)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


fanfuxiaoran commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519482732

   Thanks for your detailed explanation. 
   
   > > Looks interesting. And I have some questions to discuss.
   > > 
   > > * Beside the seqscan, can the runtime filter apply to other types of 
scan? such as the index scan.
   > > * Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   > 
   > * Beside the seqscan, can the runtime filter apply to other types of scan? 
such as the index scan.
   > 
   > > Theoretically, it is feasible to apply runtime filters to operators such 
as Index Scan. However, because Index Scan already reduces data volume by 
leveraging an optimized storage structure, the performance gains from applying 
runtime filters to Index Scan would likely be minimal. Thus, I think that 
applying runtime filters to Index Scan would not yield significant performance 
benefits.
   > 
   Make sense. When doing hashjoin, index scan or index only scan are often not 
used on probe node.
   
   > > In subsequent work, when we discover that other scan operators can 
achieve notable performance improvements from pushdown runtime filters, we will 
support these operators. Our focus will be on operators where runtime filters 
can substantially decrease the amount of data processed early in the query 
execution, leading to more pronounced performance enhancements.
   > 
   > * Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   > 
   > > Yes, the current pushdown runtime filter only supports in-process 
pushdown, which means that the Hash Join and SeqScan need to be within the same 
process. The design and implementation of cross-process pushdown runtime 
filters are much more complex.
   > 
   > > This limitation arises because coordinating and sharing data structures 
like Bloom filters or other runtime filters across different processes involves 
additional challenges such as inter-process communication (IPC), 
synchronization, and ensuring consistency and efficiency of the filters across 
process boundaries. Addressing these issues requires a more sophisticated 
design that can handle the complexities of distributed computing environments.
   
   Exactly, and if  there is any lock  used to solve the problem may even lead 
bad performance.   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519287605

   > ```sql
   > ```sql
   > gpadmin=# show gp_enable_runtime_filter_pushdown;
   >  gp_enable_runtime_filter_pushdown
   > ---
   >  on
   > (1 row)
   > ```
   > 
   > 
   > 
   >   
   > 
   > 
   >   
   > 
   > 
   > 
   >   
   > ```sql
   > CREATE TABLE test_tablesample (dist int, id int, name text) WITH 
(fillfactor=10) DISTRIBUTED BY (dist);
   > -- use fillfactor so we don't have to load too much data to get multiple 
pages
   > 
   > -- Changed the column length in order to match the expected results based 
on relation's blocksz
   > INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM 
generate_series(0, 9) s(i) ORDER BY i;
   > INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM 
generate_series(10, 19) s(i) ORDER BY i;
   > INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM 
generate_series(20, 29) s(i) ORDER BY i;
   > EXPLAIN (COSTS OFF)
   >   SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
   > FATAL:  Unexpected internal error (assert.c:48)
   > DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: 
"explain.c", Line: 4154)
   > server closed the connection unexpectedly
   > This probably means the server terminated abnormally
   > before or while processing the request.
   > The connection to the server was lost. Attempting reset: Succeeded.
   > psql (14.4, server 14.4)
   > ```
   > ```
   
   Thanks, I'll reproduce the issue and fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


avamingli commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519274062

   Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will get 
a crash:
   
   ```sql
   gpadmin=# show gp_enable_runtime_filter_pushdown;
gp_enable_runtime_filter_pushdown
   ---
on
   (1 row)
   ```
   ```sql
   CREATE TABLE test_tablesample (dist int, id int, name text) WITH 
(fillfactor=10) DISTRIBUTED BY (dist);
   -- use fillfactor so we don't have to load too much data to get multiple 
pages
   
   -- Changed the column length in order to match the expected results based on 
relation's blocksz
   INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM 
generate_series(0, 9) s(i) ORDER BY i;
   INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM 
generate_series(10, 19) s(i) ORDER BY i;
   INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM 
generate_series(20, 29) s(i) ORDER BY i;
   EXPLAIN (COSTS OFF)
 SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2);
   FATAL:  Unexpected internal error (assert.c:48)
   DETAIL:  FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", 
Line: 4154)
   server closed the connection unexpectedly
   This probably means the server terminated abnormally
   before or while processing the request.
   The connection to the server was lost. Attempting reset: Succeeded.
   psql (14.4, server 14.4)
   ```
   
   https://github.com/user-attachments/assets/a8882208-1800-4df3-9f2d-7c659e24aaa3";>
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


fanfuxiaoran commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1870569206


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))

Review Comment:
   Sorry, I didn't make it clear.  I don't mean the predication on the var.  
like the below sql
   ```
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
   SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = (t2.c2 + 10);
   QUERY PLAN
   
---
Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
  ->  Hash Join (actual rows=0 loops=1)
Hash Cond: (t1.c2 = (t2.c2 + 10))
Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
->  Seq Scan on t1 (actual rows=128 loops=1)
->  Hash (actual rows=32 loops=1)
  Buckets: 524288  Batches: 1  Memory Usage: 4098kB
  ->  Seq Scan on t2 (actual rows=32 loops=1)
Optimizer: Postgres query optimizer
   (9 rows)
   ```
   As `t2.c2 + 10` is not a `Var` but a `T_OpExpr` , the runtime filter cannot 
ha

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2517322395

   > Looks interesting. And I have some questions to discuss.
   > 
   > * Beside the seqscan, can the runtime filter apply to other types of scan? 
such as the index scan.
   > * Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   
   * Beside the seqscan, can the runtime filter apply to other types of scan? 
such as the index scan.
   > Theoretically, it is feasible to apply runtime filters to operators such 
as Index Scan. However, because Index Scan already reduces data volume by 
leveraging an optimized storage structure, the performance gains from applying 
runtime filters to Index Scan would likely be minimal. Thus, I think that 
applying runtime filters to Index Scan would not yield significant performance 
benefits.
   
   > In subsequent work, when we discover that other scan operators can achieve 
notable performance improvements from pushdown runtime filters, we will support 
these operators. Our focus will be on operators where runtime filters can 
substantially decrease the amount of data processed early in the query 
execution, leading to more pronounced performance enhancements.
   
   * Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   > Yes, the current pushdown runtime filter only supports in-process 
pushdown, which means that the Hash Join and SeqScan need to be within the same 
process. The design and implementation of cross-process pushdown runtime 
filters are much more complex.
   
   > This limitation arises because coordinating and sharing data structures 
like Bloom filters or other runtime filters across different processes involves 
additional challenges such as inter-process communication (IPC), 
synchronization, and ensuring consistency and efficiency of the filters across 
process boundaries. Addressing these issues requires a more sophisticated 
design that can handle the complexities of distributed computing environments.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869442905


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))

Review Comment:
   I think that expressions like `t1.c1 = 5` should be pushed down by the 
optimizer to operators such as SeqScan for early processing. Therefore, this 
feature does not handle expressions of the form `t1.c1 = 5`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856482708


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   1. Combining Bloom filters will result in a higher False Positive Rate (FPR) 
compared to using each of the individual Bloom filters separately, so it is not 
recommended;
   2. There is the same problem to combine range filters like combining Bloom 
filters;
   3. ~~There is only one Bloom filter and one range filter on the same 
attribute in many cases;~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869418850


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;
+
+   if (node->filter_in_seqscan && node->filters &&
+   !PassByBloomFilter(node, slot))

Review Comment:
   Yes, when creating the Bloom filter, the system evaluates the estimated 
number of rows that this hash join will process and the amount of available 
memory during the execution plan generation. It determines whether using a 
Bloom filter for filtering data would be effective based on this evaluation. If 
it is assessed that the Bloom filter would not sufficiently enhance 
performance, then the Bloom filter will not be created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-12-04 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869401110


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;
+
+   if (node->filter_in_seqscan && node->filters &&
+   !PassByBloomFilter(node, slot))

Review Comment:
   **Similarities:**
   In this feature, the creation and testing of the Bloom filter use the same 
implementation as that of the gp_enable_runtime_filter feature.
   
   **Differences:**
   
   - The gp_enable_runtime_filter feature decides during the optimizer phase 
whether to use a Bloom filter to filter data. It inserts the functionality of 
filtering data with a Bloom filter as an nodeRuntimeFilter node into the left 
subtree of a Hash Join.
   
   - In contrast, this new feature does not insert an node. Instead, it 
directly pushes down the Bloom filter and other filters to the SeqScan or even 
the Access Method (AM), applying the filters closer to where the data is 
generated. This approach enhances filtering efficiency compared to 
gp_enable_runtime_filter.
   
   By pushing the filters closer to the data source, this new method achieves 
more efficient data filtering before the data reaches higher-level operators, 
potentially reducing the amount of data that needs to be processed by 
subsequent operations in the query execution plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-30 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1864161626


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   Add debug message in 
https://github.com/apache/cloudberry/pull/724/commits/274d8aa1ff0b33a4d24e3a7765b493904c61dd60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-29 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1864115997


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))
+   break;
+
+   var = lfirst(lc);
+   if (var->varno == INNER_VAR)
+   *rattno = var->varattno;
+   else if (var->varno == OUTER_VAR)
+   *lattno = var->varattno;
+   else
+   break;
+
+   match = true;
+   }
+
+   return match;
+}
+
+/*
+ * it's just allowed like this:
+ *   HashJoin
+ *  ... a series of HashJoin nodes
+ *HashJoin
+ *  SeqScan <- target
+ */
+static PlanState *
+FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
+{
+   Var *var;
+   PlanState *child, *parent;
+   TargetEntry *te;
+
+   parent = (PlanState *)hjstate;
+   child  = outerPlanState(hjstate);
+   Assert(child);
+
+   *lattno = -1;
+   while (child)
+   {
+   /* target is seqscan */
+   if (IsA(child, SeqScanState))
+   {
+   te = (Ta

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-26 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2503107713

   > There are codes changed in MultiExecParallelHash, please add some parallel 
tests with runtime filter.
   
   got it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-26 Thread via GitHub


avamingli commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2503099464

   There are codes changed  in MultiExecParallelHash, please add some parallel 
tests with runtime filter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857762470


##
src/backend/executor/nodeHashjoin.c:
##
@@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node);
 static void SpillCurrentBatch(HashJoinState *node);
 static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate);
 static void ExecEagerFreeHashJoin(HashJoinState *node);
+static void CreateRuntimeFilter(HashJoinState* hjstate);
+static bool IsEqualOp(Expr *expr);
+static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
+static PlanState *FindTargetAttr(HashJoinState *hjstate,

Review Comment:
   fix with 
https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857762107


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;

Review Comment:
   fix with 
https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf



##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;

Review Comment:
   fix with 
https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf



##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;

Review Comment:
   fix with 
https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf



##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclau

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857761549


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);
+   }
+}
+
+static void
+BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
+{
+   Datum val;
+   bool  isnull;
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = (AttrFilter *) lfirst(lc);
+
+   val = slot_getattr(slot, attr_filter->rattno, &isnull);
+   if (isnull)
+   continue;
+
+   attr_filter->empty = false;
+
+   if ((int64_t)val < (int64_t)attr_filter->min)
+   attr_filter->min = val;
+
+   if ((int64_t)val > (int64_t)attr_filter->max)
+   attr_filter->max = val;
+
+   if (attr_filter->blm_filter)
+   bloom_add_element(attr_filter->blm_filter, (unsigned 
char *)&val, sizeof(Datum));
+   }
+}
+
+void
+FreeRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+   }
+
+   list_free_deep(node->filters);
+   node->filters = NIL;
+}
+
+void
+ResetRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+   SeqScanState*sss;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   attr_filter->empty  = true;
+
+   if (IsA(attr_filter->target, SeqScanState))
+   {
+   sss = castNode(SeqScanState, attr_filter->target);
+   if (sss->filters)
+   {
+   list_free_deep(sss->filters);
+   sss->filters = NIL;
+   }
+   }
+
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+
+   attr_filter->blm_filter = 
bloom_create_aggresive(node->ps.plan->plan_rows,
+work_mem, 
random());
+   attr_filter->min= LLONG_MAX;
+   attr_filter->max= LLONG_MIN;

Review Comment:
   fix with 
https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastruc

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857739192


##
src/backend/executor/nodeHashjoin.c:
##
@@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node);
 static void SpillCurrentBatch(HashJoinState *node);
 static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate);
 static void ExecEagerFreeHashJoin(HashJoinState *node);
+static void CreateRuntimeFilter(HashJoinState* hjstate);
+static bool IsEqualOp(Expr *expr);
+static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
+static PlanState *FindTargetAttr(HashJoinState *hjstate,

Review Comment:
   good idea, i will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857713706


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);
+   }
+}
+
+static void
+BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
+{
+   Datum val;
+   bool  isnull;
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = (AttrFilter *) lfirst(lc);
+
+   val = slot_getattr(slot, attr_filter->rattno, &isnull);
+   if (isnull)
+   continue;
+
+   attr_filter->empty = false;
+
+   if ((int64_t)val < (int64_t)attr_filter->min)
+   attr_filter->min = val;
+
+   if ((int64_t)val > (int64_t)attr_filter->max)
+   attr_filter->max = val;
+
+   if (attr_filter->blm_filter)
+   bloom_add_element(attr_filter->blm_filter, (unsigned 
char *)&val, sizeof(Datum));
+   }
+}
+
+void
+FreeRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+   }
+
+   list_free_deep(node->filters);
+   node->filters = NIL;
+}
+
+void
+ResetRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+   SeqScanState*sss;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   attr_filter->empty  = true;
+
+   if (IsA(attr_filter->target, SeqScanState))
+   {
+   sss = castNode(SeqScanState, attr_filter->target);
+   if (sss->filters)
+   {
+   list_free_deep(sss->filters);
+   sss->filters = NIL;
+   }
+   }
+
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+
+   attr_filter->blm_filter = 
bloom_create_aggresive(node->ps.plan->plan_rows,
+work_mem, 
random());
+   attr_filter->min= LLONG_MAX;
+   attr_filter->max= LLONG_MIN;

Review Comment:
   use  LONG_MAX, LONG_MIN instead ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857711528


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857505809


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;
+
+   if (node->filter_in_seqscan && node->filters &&
+   !PassByBloomFilter(node, slot))

Review Comment:
   what's difference with `gp_enable_runtime_filter`'s bloom filter ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857443716


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   ```sql
   create table t1(a int, b int) with(parallel_workers=2);
   create table rt1(a int, b int) with(parallel_workers=2);
   create table rt2(a int, b int);
   create table rt3(a int, b int);
   insert into t1 select i, i from generate_series(1, 10) i;
   insert into t1 select i, i+1 from generate_series(1, 10) i;
   insert into rt1 select i, i+1 from generate_series(1, 10) i;
   insert into rt2 select i, i+1 from generate_series(1, 1) i;
   insert into rt3 select i, i+1 from generate_series(1, 10) i;
   analyze t1;
   analyze rt1;
   analyze rt2;
   analyze rt3;
   
   explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a 
= t1.b;
   
   postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on 
rt3.a = t1.b;
  QUERY PLAN
   
   

Gather Motion 3:1  (slice1; segments: 3)  (cost=2.45..428.51 rows=17 
width=24)
  ->  Hash Join  (cost=2.45..428.29 rows=6 width=24)
Hash Cond: (t1.b = rt1.a)
->  Hash Join  (cost=1.23..427.00 rows=6 width=16)
  Hash Cond: (t1.b = rt3.a)
  ->  Seq Scan on t1  (cost=0.00..342.37 rows=7 width=8)
  ->  Hash  (cost=1.10..1.10 rows=10 width=8)
->  Seq Scan on rt3  (cost=0.00..1.10 rows=10 width=8)
->  Hash  (cost=1.10..1.10 rows=10 width=8)
  ->  Seq Scan on rt1  (cost=0.00..1.10 rows=10 width=8)
Optimizer: Postgres query optimizer
   (11 rows)
   
   ```
   you can try this case, will got two range filters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857443716


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   ```sql
   create table rt2(a int, b int);
   create table rt3(a int, b int);
   insert into t1 select i, i from generate_series(1, 10) i;
   insert into t1 select i, i+1 from generate_series(1, 10) i;
   insert into rt1 select i, i+1 from generate_series(1, 10) i;
   insert into rt2 select i, i+1 from generate_series(1, 1) i;
   insert into rt3 select i, i+1 from generate_series(1, 10) i;
   analyze t1;
   analyze rt1;
   analyze rt2;
   analyze rt3;
   
   explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a 
= t1.b;
   
   postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on 
rt3.a = t1.b;
  QUERY PLAN
   
   

Gather Motion 3:1  (slice1; segments: 3)  (cost=2.45..428.51 rows=17 
width=24)
  ->  Hash Join  (cost=2.45..428.29 rows=6 width=24)
Hash Cond: (t1.b = rt1.a)
->  Hash Join  (cost=1.23..427.00 rows=6 width=16)
  Hash Cond: (t1.b = rt3.a)
  ->  Seq Scan on t1  (cost=0.00..342.37 rows=7 width=8)
  ->  Hash  (cost=1.10..1.10 rows=10 width=8)
->  Seq Scan on rt3  (cost=0.00..1.10 rows=10 width=8)
->  Hash  (cost=1.10..1.10 rows=10 width=8)
  ->  Seq Scan on rt1  (cost=0.00..1.10 rows=10 width=8)
Optimizer: Postgres query optimizer
   (11 rows)
   
   ```
   you can try this case, will got two range filters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856688955


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);
+   }
+}
+
+static void
+BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
+{
+   Datum val;
+   bool  isnull;
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = (AttrFilter *) lfirst(lc);
+
+   val = slot_getattr(slot, attr_filter->rattno, &isnull);
+   if (isnull)
+   continue;
+
+   attr_filter->empty = false;
+
+   if ((int64_t)val < (int64_t)attr_filter->min)
+   attr_filter->min = val;
+
+   if ((int64_t)val > (int64_t)attr_filter->max)
+   attr_filter->max = val;
+
+   if (attr_filter->blm_filter)
+   bloom_add_element(attr_filter->blm_filter, (unsigned 
char *)&val, sizeof(Datum));
+   }
+}
+
+void
+FreeRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+   }
+
+   list_free_deep(node->filters);
+   node->filters = NIL;
+}
+
+void
+ResetRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+   SeqScanState*sss;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   attr_filter->empty  = true;
+
+   if (IsA(attr_filter->target, SeqScanState))
+   {
+   sss = castNode(SeqScanState, attr_filter->target);
+   if (sss->filters)
+   {
+   list_free_deep(sss->filters);
+   sss->filters = NIL;
+   }
+   }
+
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+
+   attr_filter->blm_filter = 
bloom_create_aggresive(node->ps.plan->plan_rows,
+work_mem, 
random());
+   attr_filter->min= LLONG_MAX;
+   attr_filter->max= LLONG_MIN;

Review Comment:
   I see `StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8");` in 
postgres.h, so it's better to use INT64_MAX/INT64_MIN here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this servi

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856568858


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856560913


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))
+   break;
+
+   var = lfirst(lc);
+   if (var->varno == INNER_VAR)
+   *rattno = var->varattno;
+   else if (var->varno == OUTER_VAR)
+   *lattno = var->varattno;
+   else
+   break;
+
+   match = true;
+   }
+
+   return match;

Review Comment:
   use the more intuitive way to refactor the code, like below
   ```
   /* check the first arg */
   ...
   
   /* check the second arg */
   ...
   
   return true;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr.

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-25 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856482708


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   1. Combining Bloom filters will result in a higher False Positive Rate (FPR) 
compared to using each of the individual Bloom filters separately, so it is not 
recommended;
   2. There is the same problem to combine range filters like combining Bloom 
filters;
   3. There is only one Bloom filter and one range filter on the same attribute 
in many cases;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855868906


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   > how to debug and get pushdown scankey here ?
   
   Add debug message to dump the stats about how many tuples are filter out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855867578


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   The test case should cover that the hash join node or result node is the 
child of the parent hash join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855838595


##
src/backend/executor/nodeSeqscan.c:
##
@@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
}
node->ss.ss_currentScanDesc = scandesc;
 }
+
+/*
+ * Returns true if the element may be in the bloom filter.
+ */
+static bool
+PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
+{
+   ScanKey sk;
+   Datum   val;
+   boolisnull;
+   ListCell *lc;
+   bloom_filter *blm_filter;
+
+   foreach (lc, node->filters)
+   {
+   sk = lfirst(lc);
+   if (sk->sk_flags != SK_BLOOM_FILTER)
+   continue;
+
+   val = slot_getattr(slot, sk->sk_attno, &isnull);
+   if (isnull)
+   continue;
+
+   blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument);
+   if (bloom_lacks_element(blm_filter, (unsigned char *)&val, 
sizeof(Datum)))
+   return false;
+   }
+
+   return true;
+}
+
+/*
+ * Convert the list of ScanKey to the array, and append an emtpy ScanKey as
+ * the end flag of the array.
+ */
+static ScanKey
+ScanKeyListToArray(List *keys, int *num)
+{
+   ScanKey sk;
+
+   if (list_length(keys) == 0)
+   return NULL;
+
+   Assert(num);
+   *num = list_length(keys);
+
+   sk = (ScanKey)palloc(sizeof(ScanKeyData) * (*num + 1));
+   for (int i = 0; i < *num; ++i)
+   memcpy(&sk[i], list_nth(keys, i), sizeof(ScanKeyData));
+
+   /*
+* SK_EMPYT means the end of the array of the ScanKey
+*/
+   sk[*num].sk_flags = SK_EMPYT;

Review Comment:
   How to check the boundary of the `ScanKey` array in rescan? In normal 
rescan, the number of `ScanKey`s is the same as begin_scan. If the number of 
`ScanKey`s is larger in rescan than that in begin_scan, the boundary value 
might be invalid and dangerous to access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855812576


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;

Review Comment:
   Will it be true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855741525


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))
+   break;
+
+   var = lfirst(lc);
+   if (var->varno == INNER_VAR)
+   *rattno = var->varattno;
+   else if (var->varno == OUTER_VAR)
+   *lattno = var->varattno;
+   else
+   break;
+
+   match = true;
+   }
+
+   return match;

Review Comment:
   The match flag gets the code hard(several modifications) to read. The break 
statement could be replaced by `return false;`. If the foreach loop ends, all 
conditions match, so returns true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: com

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855733105


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;

Review Comment:
   These 2 lines duplicate with the following if-elseif-else code, could be 
deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855727314


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/

Review Comment:
   Could you give a little more explain about why these join types are 
supported and others are not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-24 Thread via GitHub


gfphoenix78 commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855726047


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);
+   }
+}
+
+static void
+BuildRuntimeFilter(HashState *node, TupleTableSlot *slot)
+{
+   Datum val;
+   bool  isnull;
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = (AttrFilter *) lfirst(lc);
+
+   val = slot_getattr(slot, attr_filter->rattno, &isnull);
+   if (isnull)
+   continue;
+
+   attr_filter->empty = false;
+
+   if ((int64_t)val < (int64_t)attr_filter->min)
+   attr_filter->min = val;
+
+   if ((int64_t)val > (int64_t)attr_filter->max)
+   attr_filter->max = val;
+
+   if (attr_filter->blm_filter)
+   bloom_add_element(attr_filter->blm_filter, (unsigned 
char *)&val, sizeof(Datum));
+   }
+}
+
+void
+FreeRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+   }
+
+   list_free_deep(node->filters);
+   node->filters = NIL;
+}
+
+void
+ResetRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   AttrFilter  *attr_filter;
+   SeqScanState*sss;
+
+   if (!node->filters)
+   return;
+
+   foreach (lc, node->filters)
+   {
+   attr_filter = lfirst(lc);
+   attr_filter->empty  = true;
+
+   if (IsA(attr_filter->target, SeqScanState))
+   {
+   sss = castNode(SeqScanState, attr_filter->target);
+   if (sss->filters)
+   {
+   list_free_deep(sss->filters);
+   sss->filters = NIL;
+   }
+   }
+
+   if (attr_filter->blm_filter)
+   bloom_free(attr_filter->blm_filter);
+
+   attr_filter->blm_filter = 
bloom_create_aggresive(node->ps.plan->plan_rows,
+work_mem, 
random());
+   attr_filter->min= LLONG_MAX;
+   attr_filter->max= LLONG_MIN;

Review Comment:
   LLONG_MAX, LLONG_MIN are platform-spec value, i.e. the bound value for 
`unsigned long long`, which may not be exactly the same width as Datum. For 
safety, static assert could be considered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-22 Thread via GitHub


zhangyue-hashdata closed pull request #724: Push the runtime filter from 
HashJoin down to SeqScan or AM.
URL: https://github.com/apache/cloudberry/pull/724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-22 Thread via GitHub


zhangyue-hashdata commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2493470306

   > Looks interesting. And I have some questions to discuss.
   > 
   > * Beside the seqscan, can the runtime filter apply to other types of scan? 
such as the index scan.
   > * Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-22 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853724139


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))
+   break;
+
+   var = lfirst(lc);
+   if (var->varno == INNER_VAR)
+   *rattno = var->varattno;
+   else if (var->varno == OUTER_VAR)
+   *lattno = var->varattno;
+   else
+   break;
+
+   match = true;
+   }
+
+   return match;
+}
+
+/*
+ * it's just allowed like this:
+ *   HashJoin
+ *  ... a series of HashJoin nodes
+ *HashJoin
+ *  SeqScan <- target
+ */
+static PlanState *
+FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
+{
+   Var *var;
+   PlanState *child, *parent;
+   TargetEntry *te;
+
+   parent = (PlanState *)hjstate;
+   child  = outerPlanState(hjstate);
+   Assert(child);
+
+   *lattno = -1;
+   while (child)
+   {
+   /* target is seqscan */
+   if (IsA(child, SeqScanState))
+   {
+   te = (Ta

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-22 Thread via GitHub


zhangyue-hashdata commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853719676


##
src/backend/executor/nodeSeqscan.c:
##
@@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
}
node->ss.ss_currentScanDesc = scandesc;
 }
+
+/*
+ * Returns true if the element may be in the bloom filter.
+ */
+static bool
+PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
+{
+   ScanKey sk;
+   Datum   val;
+   boolisnull;
+   ListCell *lc;
+   bloom_filter *blm_filter;
+
+   foreach (lc, node->filters)
+   {
+   sk = lfirst(lc);
+   if (sk->sk_flags != SK_BLOOM_FILTER)
+   continue;
+
+   val = slot_getattr(slot, sk->sk_attno, &isnull);
+   if (isnull)

Review Comment:
   I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-22 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853703384


##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))
+   break;
+
+   var = lfirst(lc);
+   if (var->varno == INNER_VAR)
+   *rattno = var->varattno;
+   else if (var->varno == OUTER_VAR)
+   *lattno = var->varattno;
+   else
+   break;
+
+   match = true;
+   }
+
+   return match;
+}
+
+/*
+ * it's just allowed like this:
+ *   HashJoin
+ *  ... a series of HashJoin nodes
+ *HashJoin
+ *  SeqScan <- target
+ */
+static PlanState *
+FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
+{
+   Var *var;
+   PlanState *child, *parent;
+   TargetEntry *te;
+
+   parent = (PlanState *)hjstate;
+   child  = outerPlanState(hjstate);
+   Assert(child);
+
+   *lattno = -1;
+   while (child)
+   {
+   /* target is seqscan */
+   if (IsA(child, SeqScanState))
+   {
+   te = (TargetEntry 

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


fanfuxiaoran commented on PR #724:
URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2492889570

   Looks interesting. And I have some questions to discuss.
   - Beside the seqscan, can the runtime filter apply to other types of scan? 
such as the index scan.
   - Looks only when the `hashjoin` node and `seqscan` node run in the same 
process can use the runtime filter. Which means the tables should have same 
distributed policy on the join columns or one of the table is replicated.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


fanfuxiaoran commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853241023


##
src/backend/executor/nodeHashjoin.c:
##
@@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node);
 static void SpillCurrentBatch(HashJoinState *node);
 static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate);
 static void ExecEagerFreeHashJoin(HashJoinState *node);
+static void CreateRuntimeFilter(HashJoinState* hjstate);
+static bool IsEqualOp(Expr *expr);
+static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
+static PlanState *FindTargetAttr(HashJoinState *hjstate,

Review Comment:
   Looks FindTargetNode is better.



##
src/backend/executor/nodeHashjoin.c:
##
@@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state,
ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
}
 }
+
+/*
+ * Find "inner var = outer var" in hj->hashclauses and create runtime filter
+ * for it.
+ */
+void
+CreateRuntimeFilter(HashJoinState* hjstate)
+{
+   AttrNumber lattno, rattno;
+   Expr*expr;
+   JoinTypejointype;
+   HashJoin*hj;
+   HashState   *hstate;
+   PlanState   *target;
+   AttrFilter  *attr_filter;
+   ListCell*lc;
+
+   /*
+* Only applicatable for inner, right and semi join,
+*/
+   jointype = hjstate->js.jointype;
+   if (jointype != JOIN_INNER
+   && jointype != JOIN_RIGHT
+   && jointype != JOIN_SEMI
+  )
+   return;
+
+   hstate = castNode(HashState, innerPlanState(hjstate));
+   hstate->filters = NIL;
+
+   /*
+* check and initialize the runtime filter for all hash conds in
+* hj->hashclauses
+*/
+   hj = castNode(HashJoin, hjstate->js.ps.plan);
+   foreach (lc, hj->hashclauses)
+   {
+   expr = (Expr *)lfirst(lc);
+
+   if (!IsEqualOp(expr))
+   continue;
+
+   lattno = -1;
+   rattno = -1;
+   if (!CheckEqualArgs(expr, &lattno, &rattno))
+   continue;
+
+   if (lattno < 1 || rattno < 1)
+   continue;
+
+   target = FindTargetAttr(hjstate, lattno, &lattno);
+   if (lattno == -1 || target == NULL || IsA(target, 
HashJoinState))
+   continue;
+   Assert(IsA(target, SeqScanState));
+
+   attr_filter = CreateAttrFilter(target, lattno, rattno,
+  
hstate->ps.plan->plan_rows);
+   if (attr_filter->blm_filter)
+   hstate->filters = lappend(hstate->filters, attr_filter);
+   else
+   pfree(attr_filter);
+   }
+}
+
+static bool
+IsEqualOp(Expr *expr)
+{
+   Oid funcid = InvalidOid;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   funcid = ((OpExpr *)expr)->opfuncid;
+   else if (IsA(expr, FuncExpr))
+   funcid = ((FuncExpr *)expr)->funcid;
+   else
+   return false;
+
+   if (funcid == F_INT2EQ  || funcid == F_INT4EQ  || funcid == F_INT8EQ
+   || funcid == F_INT24EQ || funcid == F_INT42EQ
+   || funcid == F_INT28EQ || funcid == F_INT82EQ
+   || funcid == F_INT48EQ || funcid == F_INT84EQ
+  )
+   return true;
+
+   return false;
+}
+
+/*
+ * runtime filters which can be pushed down:
+ * 1. hash expr MUST BE equal op;
+ * 2. args MUST BE Var node;
+ * 3. the data type MUST BE integer;
+ */
+static bool
+CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
+{
+   Var *var;
+   boolmatch;
+   List*args;
+   ListCell *lc;
+
+   if (lattno == NULL || rattno == NULL)
+   return false;
+
+   if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr))
+   return false;
+
+   if (IsA(expr, OpExpr))
+   args = ((OpExpr *)expr)->args;
+   else if (IsA(expr, FuncExpr))
+   args = ((FuncExpr *)expr)->args;
+   else
+   return false;
+
+   if (!args || list_length(args) != 2)
+   return false;
+
+   match = false;
+   foreach (lc, args)
+   {
+   match = false;
+
+   if (!IsA(lfirst(lc), Var))

Review Comment:
   Could it support other expression, whose one arg is the column attr, and the 
other is a const?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853188280


##
src/test/regress/expected/gp_runtime_filter.out:
##
@@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf
   1600
 (1 row)
 
+-- Test bloom filter pushdown
+DROP TABLE IF EXISTS t1;
+NOTICE:  table "t1" does not exist, skipping
+DROP TABLE IF EXISTS t2;
+NOTICE:  table "t2" does not exist, skipping
+CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed by (c1);
+CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, 
orientation=column) distributed REPLICATED;
+INSERT INTO t1 VALUES (5,5,5,5,5);
+INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+INSERT INTO t2 select * FROM t2;
+ANALYZE;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=128 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB
+   ->  Seq Scan on t2 (actual rows=32 loops=1)
+ Optimizer: Postgres query optimizer
+(9 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2;
+QUERY PLAN 

+---
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=0 loops=1)
+   ->  Hash Join (never executed)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg2)   Hash chain length 8.0 avg, 8 max, using 4 of 
524288 buckets.
+ ->  Seq Scan on t1 (actual rows=1 loops=1)
+ ->  Hash (actual rows=32 loops=1)
+   Buckets: 524288  Batches: 1  Memory Usage: 4098kB

Review Comment:
   how to debug and get pushdown scankey here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853185274


##
src/backend/executor/nodeSeqscan.c:
##
@@ -87,8 +101,17 @@ SeqNext(SeqScanState *node)
/*
 * get the next tuple from the table
 */
-   if (table_scan_getnextslot(scandesc, direction, slot))
+   while (table_scan_getnextslot(scandesc, direction, slot))
+   {
+   if (TupIsNull(slot))
+   return slot;
+
+   if (node->filter_in_seqscan && node->filters &&
+   !PassByBloomFilter(node, slot))

Review Comment:
   tpcds 1TB, bloom filter will lose efficacy or create failed due to large 
rows ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853184388


##
src/backend/executor/nodeSeqscan.c:
##
@@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
}
node->ss.ss_currentScanDesc = scandesc;
 }
+
+/*
+ * Returns true if the element may be in the bloom filter.
+ */
+static bool
+PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
+{
+   ScanKey sk;
+   Datum   val;
+   boolisnull;
+   ListCell *lc;
+   bloom_filter *blm_filter;
+
+   foreach (lc, node->filters)
+   {
+   sk = lfirst(lc);
+   if (sk->sk_flags != SK_BLOOM_FILTER)
+   continue;
+
+   val = slot_getattr(slot, sk->sk_attno, &isnull);
+   if (isnull)

Review Comment:
   ```c++
   CREATE TABLE distinct_1(a int);
   CREATE TABLE distinct_2(a int);
   INSERT INTO distinct_1 VALUES(1),(2),(NULL);
   INSERT INTO distinct_2 VALUES(1),(NULL);
   SELECT * FROM distinct_1, distinct_2 WHERE distinct_1.a IS NOT DISTINCT FROM 
distinct_2.a;
   ```
   test got wrong result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org



Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]

2024-11-21 Thread via GitHub


yjhjstz commented on code in PR #724:
URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853183808


##
src/backend/executor/nodeHash.c:
##
@@ -4126,3 +4151,138 @@ get_hash_mem(void)
 
return (int) mem_limit;
 }
+
+/*
+ * Convert AttrFilter to ScanKeyData and send these runtime filters to the
+ * target node(seqscan).
+ */
+void
+PushdownRuntimeFilter(HashState *node)
+{
+   ListCell*lc;
+   List*scankeys;
+   ScanKey sk;
+   AttrFilter  *attr_filter;
+
+   foreach (lc, node->filters)
+   {
+   scankeys = NIL;
+
+   attr_filter = lfirst(lc);
+   if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+   continue;
+
+   /* bloom filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= SK_BLOOM_FILTER;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
+   scankeys = lappend(scankeys, sk);
+
+   /* range filter */
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTGreaterEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->min;
+   scankeys = lappend(scankeys, sk);
+
+   sk = (ScanKey)palloc0(sizeof(ScanKeyData));
+   sk->sk_flags= 0;
+   sk->sk_attno= attr_filter->lattno;
+   sk->sk_strategy = BTLessEqualStrategyNumber;
+   sk->sk_subtype  = INT8OID;
+   sk->sk_argument = attr_filter->max;
+   scankeys = lappend(scankeys, sk);
+
+   /* append new runtime filters to target node */
+   SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
+   sss->filters = list_concat(sss->filters, scankeys);

Review Comment:
   can we merge filter here on the same attno ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org