Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2577713757 > > I have retested the performance of tpcds 10s using the previously mentioned testing method. Please see the description part for the latest results > > cool, what about tpcds 100 sf ? Please see the description part for the results of tpcds 100s. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2572320826 > I have retested the performance of tpcds 10s using the previously mentioned testing method. Please see the description part for the latest results cool, what about tpcds 100 sf ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2572299954 > > from tpcds 10s details table, there are some bad cases. > > **21,24,30,42,49,54,68-1,99** > > I retested these SQL statements that exhibited performance regression, and the latest test results show no noticeable performance difference when toggling **gp_enable_runtime_filter_pushdown**. So, I speculate that the performance regression in these SQL statements might be associated with testing method. Previously I tested by running the entire suite of 99 TPC-DS queries with **gp_enable_runtime_filter_pushdown** enabled, and then again with it disabled. > > Therefore, a more appropriate method would be to execute the same SQL statement multiple times with **gp_enable_runtime_filter_pushdown** both enabled and disabled, respectively, and then take the average of those runs for comparison. I will follow this testing method for retesting and observe if there's any performance regression. I have retested the performance of tpcds 10s using the previously mentioned testing method. Please see the description part for the latest results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2567252216 > from tpcds 10s details table, there are some bad cases. **21,24,30,42,49,54,68-1,99** I retested these SQL statements that exhibited performance regression, and the latest test results show no noticeable performance difference when toggling **gp_enable_runtime_filter_pushdown**. So, I speculate that the performance regression in these SQL statements might be associated with testing method. Previously I tested by running the entire suite of 99 TPC-DS queries with **gp_enable_runtime_filter_pushdown** enabled, and then again with it disabled. Therefore, a more appropriate method would be to execute the same SQL statement multiple times with **gp_enable_runtime_filter_pushdown** both enabled and disabled, respectively, and then take the average of those runs for comparison. I will follow this testing method for retesting and observe if there's any performance regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2556016039 from tpcds 10s details table, there are some bad cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1885681270 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,14 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (node->filter_in_seqscan && node->filters && Review Comment: good idea! I fix in https://github.com/apache/cloudberry/pull/724/commits/bcf93e66f72a35db0ab5694a92c59c768423b1ab -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1879796311 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,14 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (node->filter_in_seqscan && node->filters && Review Comment: ```c++ if (!node->filter_in_seqscan || !node->filters) { if (table_scan_getnextslot(scandesc, direction, slot)) return slot; } else { while (table_scan_getnextslot(scandesc, direction, slot)) { . } } ``` this make origin path more efficient and readable ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: > The test case should cover that the hash join node or result node is the child of the parent hash join. fix in https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a and https://github.com/apache/cloudberry/pull/724/commits/768f6230d051580f9f16f0e2e1d9395b3d2faacf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: > The test case should cover that the hash join node or result node is the child of the parent hash join. add test cases in https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a and https://github.com/apache/cloudberry/pull/724/commits/768f6230d051580f9f16f0e2e1d9395b3d2faacf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1878231460 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: > The test case should cover that the hash join node or result node is the child of the parent hash join. fix in https://github.com/apache/cloudberry/pull/724/commits/5885683b5563a6335029a34d6ae1429cf178c78a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2530888254 > There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter. fix it in https://github.com/apache/cloudberry/pull/724/commits/7ab040ae178e9bb616bd75e0488aa6a2293d4183 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2525105120 > Hi @zhangyue-hashdata I see that previous runtime filter implementation relies on some cost model at try_runtime_filter(). Do I understand it correctly, that this PR does not do any cost evaluation? Also for TPC-H/TPC-DS can you provide results for each query separately? > > Asking mostly out of curiosity, I see here are quite a few reviewers here already :) Basically, you're correct. Because our goal is to filter out as much data as possible right at the point of data generation. However, this will lead to very complex evaluations, so we only made a simple estimation based on rows and work memory when creating the Bloom filter. Furthermore, I have placed the detailed test results for TPC-DS 10s in PR description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1872817255 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) Review Comment: > It determines whether using a Bloom filter for filtering data would be effective based on this evaluation That makes sense, but where is related code, I just didn't see them in this pr. Does it compares the number of rows between the output of hashtable and data in the probe table? If the rows of the hashtable are far less than that of the probe table , then use the runtime filter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
Smyatkin-Maxim commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520691766 Hi @zhangyue-hashdata I see that previous runtime filter implementation relies on some cost model at try_runtime_filter(). Do I understand it correctly, that this PR does not do any cost evaluation? Also for TPC-H/TPC-DS can you provide results for each query separately? Asking mostly out of curiosity, I see here are quite a few reviewers here already :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520535244 > ``` > explain analyze > SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ; > QUERY PLAN > > - > -- > Finalize Aggregate (cost=1700.07..1700.08 rows=1 width=8) (actual time=32119.566..32119 > .571 rows=1 loops=1) >-> Gather Motion 3:1 (slice1; segments: 3) (cost=1700.02..1700.07 rows=3 width=8) ( > actual time=30.967..32119.550 rows=3 loops=1) > -> Partial Aggregate (cost=1700.02..1700.03 rows=1 width=8) (actual time=32119 > .131..32119.135 rows=1 loops=1) >-> Hash Join (cost=771.01..1616.68 rows=4 width=4) (actual time=14.0 > 59..32116.962 rows=33462 loops=1) > Hash Cond: (t3.c1 = t1.c1) > Extra Text: (seg0) Hash chain length 1.0 avg, 3 max, using 32439 o > f 524288 buckets. > -> Seq Scan on t3 (cost=0.00..387.34 rows=4 width=4) (actual t > ime=0.028..32089.490 rows=33462 loops=1) > -> Hash (cost=354.34..354.34 rows=4 width=8) (actual time=13.2 > 57..13.259 rows=33462 loops=1) >Buckets: 524288 Batches: 1 Memory Usage: 5404kB >-> Seq Scan on t1 (cost=0.00..354.34 rows=4 width=8) (ac > tual time=0.180..4.877 rows=33462 loops=1) > Planning Time: 0.227 ms > ``` > > runtime_filter has been pushed down to t3 table seqscan, but 'explain analyze' doesn't print them out. > > ``` > \d t1 > Table "public.t1" > Column | Type | Collation | Nullable | Default > +-+---+--+- > c1 | integer | | | > c2 | integer | | | > c3 | integer | | | > c4 | integer | | | > c5 | integer | | | > Checksum: t > Indexes: > "t1_c2" btree (c2) > Distributed by: (c1) > ``` > > ``` > \d t3 > Table "public.t3" > Column | Type | Collation | Nullable | Default > +-+---+--+- > c1 | integer | | | > c2 | integer | | | > c3 | integer | | | > c4 | integer | | | > c5 | integer | | | > Distributed by: (c1) > ``` Thanks for your test case. Based on these, I rewrote code to ensure that debug info are always displayed even when the number of filtered rows is zero. And add the test case into gp_runtime_filter.sql too. fix in https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1871535784 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ Review Comment: Add more message to explain why just inner, right and semi join are allowed with runtime filter. fix it in https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2520542895 > Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will get a crash: > > ```sql > gpadmin=# show gp_enable_runtime_filter_pushdown; > gp_enable_runtime_filter_pushdown > --- > on > (1 row) > ``` > > ```sql > CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); > -- use fillfactor so we don't have to load too much data to get multiple pages > > -- Changed the column length in order to match the expected results based on relation's blocksz > INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; > INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; > INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; > EXPLAIN (COSTS OFF) > SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); > FATAL: Unexpected internal error (assert.c:48) > DETAIL: FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154) > server closed the connection unexpectedly > This probably means the server terminated abnormally > before or while processing the request. > The connection to the server was lost. Attempting reset: Succeeded. > psql (14.4, server 14.4) > ``` > > https://private-user-images.githubusercontent.com/17311022/392676960-a8882208-1800-4df3-9f2d-7c659e24aaa3.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzM0MTAzMTUsIm5iZiI6MTczMzQxMDAxNSwicGF0aCI6Ii8xNzMxMTAyMi8zOTI2NzY5NjAtYTg4ODIyMDgtMTgwMC00ZGYzLTlmMmQtN2M2NTllMjRhYWEzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDEyMDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQxMjA1VDE0NDY1NVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWRhYzExMjYyNmM3MTllZGExNWJhMmZhNjRiZjYwODg4MmY3YTVlMjcyZjYxMjg2YzVkZmM1MmY4OTNlZWQxNTgmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.inCY4gvv-RdOzeQPxitJ7obOz2FFpSMeYCgWzrERhS0";> hanks for your test case. I fix it in https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00 And add the test case into gp_runtime_filter.sql too. fix in https://github.com/apache/cloudberry/commit/98dac6dfc7d5e44e111aa16bdf5948d07ee2eb00 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519492174 ``` explain analyze SELECT count(t1.c3) FROM t1, t3 WHERE t1.c1 = t3.c1 ; QUERY PLAN - -- Finalize Aggregate (cost=1700.07..1700.08 rows=1 width=8) (actual time=32119.566..32119 .571 rows=1 loops=1) -> Gather Motion 3:1 (slice1; segments: 3) (cost=1700.02..1700.07 rows=3 width=8) ( actual time=30.967..32119.550 rows=3 loops=1) -> Partial Aggregate (cost=1700.02..1700.03 rows=1 width=8) (actual time=32119 .131..32119.135 rows=1 loops=1) -> Hash Join (cost=771.01..1616.68 rows=4 width=4) (actual time=14.0 59..32116.962 rows=33462 loops=1) Hash Cond: (t3.c1 = t1.c1) Extra Text: (seg0) Hash chain length 1.0 avg, 3 max, using 32439 o f 524288 buckets. -> Seq Scan on t3 (cost=0.00..387.34 rows=4 width=4) (actual t ime=0.028..32089.490 rows=33462 loops=1) -> Hash (cost=354.34..354.34 rows=4 width=8) (actual time=13.2 57..13.259 rows=33462 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 5404kB -> Seq Scan on t1 (cost=0.00..354.34 rows=4 width=8) (ac tual time=0.180..4.877 rows=33462 loops=1) Planning Time: 0.227 ms ``` runtime_filter has been pushed down to t3 table seqscan, but 'explain analyze' doesn't print them out. ``` \d t1 Table "public.t1" Column | Type | Collation | Nullable | Default +-+---+--+- c1 | integer | | | c2 | integer | | | c3 | integer | | | c4 | integer | | | c5 | integer | | | Checksum: t Indexes: "t1_c2" btree (c2) Distributed by: (c1) ``` ``` \d t3 Table "public.t3" Column | Type | Collation | Nullable | Default +-+---+--+- c1 | integer | | | c2 | integer | | | c3 | integer | | | c4 | integer | | | c5 | integer | | | Distributed by: (c1) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519482732 Thanks for your detailed explanation. > > Looks interesting. And I have some questions to discuss. > > > > * Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. > > * Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. > > * Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. > > > Theoretically, it is feasible to apply runtime filters to operators such as Index Scan. However, because Index Scan already reduces data volume by leveraging an optimized storage structure, the performance gains from applying runtime filters to Index Scan would likely be minimal. Thus, I think that applying runtime filters to Index Scan would not yield significant performance benefits. > Make sense. When doing hashjoin, index scan or index only scan are often not used on probe node. > > In subsequent work, when we discover that other scan operators can achieve notable performance improvements from pushdown runtime filters, we will support these operators. Our focus will be on operators where runtime filters can substantially decrease the amount of data processed early in the query execution, leading to more pronounced performance enhancements. > > * Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. > > > Yes, the current pushdown runtime filter only supports in-process pushdown, which means that the Hash Join and SeqScan need to be within the same process. The design and implementation of cross-process pushdown runtime filters are much more complex. > > > This limitation arises because coordinating and sharing data structures like Bloom filters or other runtime filters across different processes involves additional challenges such as inter-process communication (IPC), synchronization, and ensuring consistency and efficiency of the filters across process boundaries. Addressing these issues requires a more sophisticated design that can handle the complexities of distributed computing environments. Exactly, and if there is any lock used to solve the problem may even lead bad performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519287605 > ```sql > ```sql > gpadmin=# show gp_enable_runtime_filter_pushdown; > gp_enable_runtime_filter_pushdown > --- > on > (1 row) > ``` > > > > > > > > > > > > ```sql > CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); > -- use fillfactor so we don't have to load too much data to get multiple pages > > -- Changed the column length in order to match the expected results based on relation's blocksz > INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; > INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; > INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; > EXPLAIN (COSTS OFF) > SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); > FATAL: Unexpected internal error (assert.c:48) > DETAIL: FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154) > server closed the connection unexpectedly > This probably means the server terminated abnormally > before or while processing the request. > The connection to the server was lost. Attempting reset: Succeeded. > psql (14.4, server 14.4) > ``` > ``` Thanks, I'll reproduce the issue and fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
avamingli commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2519274062 Hi, with gp_enable_runtime_filter_pushdown = on, execute SQL below will get a crash: ```sql gpadmin=# show gp_enable_runtime_filter_pushdown; gp_enable_runtime_filter_pushdown --- on (1 row) ``` ```sql CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); -- use fillfactor so we don't have to load too much data to get multiple pages -- Changed the column length in order to match the expected results based on relation's blocksz INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); FATAL: Unexpected internal error (assert.c:48) DETAIL: FailedAssertion("IsA(planstate, SeqScanState)", File: "explain.c", Line: 4154) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. The connection to the server was lost. Attempting reset: Succeeded. psql (14.4, server 14.4) ``` https://github.com/user-attachments/assets/a8882208-1800-4df3-9f2d-7c659e24aaa3";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1870569206 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) Review Comment: Sorry, I didn't make it clear. I don't mean the predication on the var. like the below sql ``` EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = (t2.c2 + 10); QUERY PLAN --- Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) -> Hash Join (actual rows=0 loops=1) Hash Cond: (t1.c2 = (t2.c2 + 10)) Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. -> Seq Scan on t1 (actual rows=128 loops=1) -> Hash (actual rows=32 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 4098kB -> Seq Scan on t2 (actual rows=32 loops=1) Optimizer: Postgres query optimizer (9 rows) ``` As `t2.c2 + 10` is not a `Var` but a `T_OpExpr` , the runtime filter cannot ha
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2517322395 > Looks interesting. And I have some questions to discuss. > > * Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. > * Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. * Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. > Theoretically, it is feasible to apply runtime filters to operators such as Index Scan. However, because Index Scan already reduces data volume by leveraging an optimized storage structure, the performance gains from applying runtime filters to Index Scan would likely be minimal. Thus, I think that applying runtime filters to Index Scan would not yield significant performance benefits. > In subsequent work, when we discover that other scan operators can achieve notable performance improvements from pushdown runtime filters, we will support these operators. Our focus will be on operators where runtime filters can substantially decrease the amount of data processed early in the query execution, leading to more pronounced performance enhancements. * Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. > Yes, the current pushdown runtime filter only supports in-process pushdown, which means that the Hash Join and SeqScan need to be within the same process. The design and implementation of cross-process pushdown runtime filters are much more complex. > This limitation arises because coordinating and sharing data structures like Bloom filters or other runtime filters across different processes involves additional challenges such as inter-process communication (IPC), synchronization, and ensuring consistency and efficiency of the filters across process boundaries. Addressing these issues requires a more sophisticated design that can handle the complexities of distributed computing environments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869442905 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) Review Comment: I think that expressions like `t1.c1 = 5` should be pushed down by the optimizer to operators such as SeqScan for early processing. Therefore, this feature does not handle expressions of the form `t1.c1 = 5`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856482708 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: 1. Combining Bloom filters will result in a higher False Positive Rate (FPR) compared to using each of the individual Bloom filters separately, so it is not recommended; 2. There is the same problem to combine range filters like combining Bloom filters; 3. ~~There is only one Bloom filter and one range filter on the same attribute in many cases;~~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869418850 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) Review Comment: Yes, when creating the Bloom filter, the system evaluates the estimated number of rows that this hash join will process and the amount of available memory during the execution plan generation. It determines whether using a Bloom filter for filtering data would be effective based on this evaluation. If it is assessed that the Bloom filter would not sufficiently enhance performance, then the Bloom filter will not be created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1869401110 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) Review Comment: **Similarities:** In this feature, the creation and testing of the Bloom filter use the same implementation as that of the gp_enable_runtime_filter feature. **Differences:** - The gp_enable_runtime_filter feature decides during the optimizer phase whether to use a Bloom filter to filter data. It inserts the functionality of filtering data with a Bloom filter as an nodeRuntimeFilter node into the left subtree of a Hash Join. - In contrast, this new feature does not insert an node. Instead, it directly pushes down the Bloom filter and other filters to the SeqScan or even the Access Method (AM), applying the filters closer to where the data is generated. This approach enhances filtering efficiency compared to gp_enable_runtime_filter. By pushing the filters closer to the data source, this new method achieves more efficient data filtering before the data reaches higher-level operators, potentially reducing the amount of data that needs to be processed by subsequent operations in the query execution plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1864161626 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: Add debug message in https://github.com/apache/cloudberry/pull/724/commits/274d8aa1ff0b33a4d24e3a7765b493904c61dd60 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1864115997 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + *HashJoin + * SeqScan <- target + */ +static PlanState * +FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + while (child) + { + /* target is seqscan */ + if (IsA(child, SeqScanState)) + { + te = (Ta
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2503107713 > There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter. got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
avamingli commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2503099464 There are codes changed in MultiExecParallelHash, please add some parallel tests with runtime filter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857762470 ## src/backend/executor/nodeHashjoin.c: ## @@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static PlanState *FindTargetAttr(HashJoinState *hjstate, Review Comment: fix with https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857762107 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; Review Comment: fix with https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; Review Comment: fix with https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; Review Comment: fix with https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclau
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857761549 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell*lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + SeqScanState*sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, +work_mem, random()); + attr_filter->min= LLONG_MAX; + attr_filter->max= LLONG_MIN; Review Comment: fix with https://github.com/apache/cloudberry/pull/724/commits/99eabb224f89dcdfd7117ef37a5986ab8001abdf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastruc
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857739192 ## src/backend/executor/nodeHashjoin.c: ## @@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static PlanState *FindTargetAttr(HashJoinState *hjstate, Review Comment: good idea, i will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857713706 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell*lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + SeqScanState*sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, +work_mem, random()); + attr_filter->min= LLONG_MAX; + attr_filter->max= LLONG_MIN; Review Comment: use LONG_MAX, LONG_MIN instead ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org ---
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857711528 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: got it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857505809 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) Review Comment: what's difference with `gp_enable_runtime_filter`'s bloom filter ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857443716 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: ```sql create table t1(a int, b int) with(parallel_workers=2); create table rt1(a int, b int) with(parallel_workers=2); create table rt2(a int, b int); create table rt3(a int, b int); insert into t1 select i, i from generate_series(1, 10) i; insert into t1 select i, i+1 from generate_series(1, 10) i; insert into rt1 select i, i+1 from generate_series(1, 10) i; insert into rt2 select i, i+1 from generate_series(1, 1) i; insert into rt3 select i, i+1 from generate_series(1, 10) i; analyze t1; analyze rt1; analyze rt2; analyze rt3; explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; QUERY PLAN Gather Motion 3:1 (slice1; segments: 3) (cost=2.45..428.51 rows=17 width=24) -> Hash Join (cost=2.45..428.29 rows=6 width=24) Hash Cond: (t1.b = rt1.a) -> Hash Join (cost=1.23..427.00 rows=6 width=16) Hash Cond: (t1.b = rt3.a) -> Seq Scan on t1 (cost=0.00..342.37 rows=7 width=8) -> Hash (cost=1.10..1.10 rows=10 width=8) -> Seq Scan on rt3 (cost=0.00..1.10 rows=10 width=8) -> Hash (cost=1.10..1.10 rows=10 width=8) -> Seq Scan on rt1 (cost=0.00..1.10 rows=10 width=8) Optimizer: Postgres query optimizer (11 rows) ``` you can try this case, will got two range filters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1857443716 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: ```sql create table rt2(a int, b int); create table rt3(a int, b int); insert into t1 select i, i from generate_series(1, 10) i; insert into t1 select i, i+1 from generate_series(1, 10) i; insert into rt1 select i, i+1 from generate_series(1, 10) i; insert into rt2 select i, i+1 from generate_series(1, 1) i; insert into rt3 select i, i+1 from generate_series(1, 10) i; analyze t1; analyze rt1; analyze rt2; analyze rt3; explain analyze select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; postgres=# explain select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; QUERY PLAN Gather Motion 3:1 (slice1; segments: 3) (cost=2.45..428.51 rows=17 width=24) -> Hash Join (cost=2.45..428.29 rows=6 width=24) Hash Cond: (t1.b = rt1.a) -> Hash Join (cost=1.23..427.00 rows=6 width=16) Hash Cond: (t1.b = rt3.a) -> Seq Scan on t1 (cost=0.00..342.37 rows=7 width=8) -> Hash (cost=1.10..1.10 rows=10 width=8) -> Seq Scan on rt3 (cost=0.00..1.10 rows=10 width=8) -> Hash (cost=1.10..1.10 rows=10 width=8) -> Seq Scan on rt1 (cost=0.00..1.10 rows=10 width=8) Optimizer: Postgres query optimizer (11 rows) ``` you can try this case, will got two range filters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856688955 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell*lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + SeqScanState*sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, +work_mem, random()); + attr_filter->min= LLONG_MAX; + attr_filter->max= LLONG_MIN; Review Comment: I see `StaticAssertDecl(SIZEOF_DATUM == 8, "sizeof datum is not 8");` in postgres.h, so it's better to use INT64_MAX/INT64_MIN here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this servi
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856568858 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; Review Comment: fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856560913 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; Review Comment: use the more intuitive way to refactor the code, like below ``` /* check the first arg */ ... /* check the second arg */ ... return true; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr.
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1856482708 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: 1. Combining Bloom filters will result in a higher False Positive Rate (FPR) compared to using each of the individual Bloom filters separately, so it is not recommended; 2. There is the same problem to combine range filters like combining Bloom filters; 3. There is only one Bloom filter and one range filter on the same attribute in many cases; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855868906 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: > how to debug and get pushdown scankey here ? Add debug message to dump the stats about how many tuples are filter out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855867578 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: The test case should cover that the hash join node or result node is the child of the parent hash join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855838595 ## src/backend/executor/nodeSeqscan.c: ## @@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + boolisnull; + ListCell *lc; + bloom_filter *blm_filter; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) + continue; + + blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument); + if (bloom_lacks_element(blm_filter, (unsigned char *)&val, sizeof(Datum))) + return false; + } + + return true; +} + +/* + * Convert the list of ScanKey to the array, and append an emtpy ScanKey as + * the end flag of the array. + */ +static ScanKey +ScanKeyListToArray(List *keys, int *num) +{ + ScanKey sk; + + if (list_length(keys) == 0) + return NULL; + + Assert(num); + *num = list_length(keys); + + sk = (ScanKey)palloc(sizeof(ScanKeyData) * (*num + 1)); + for (int i = 0; i < *num; ++i) + memcpy(&sk[i], list_nth(keys, i), sizeof(ScanKeyData)); + + /* +* SK_EMPYT means the end of the array of the ScanKey +*/ + sk[*num].sk_flags = SK_EMPYT; Review Comment: How to check the boundary of the `ScanKey` array in rescan? In normal rescan, the number of `ScanKey`s is the same as begin_scan. If the number of `ScanKey`s is larger in rescan than that in begin_scan, the boundary value might be invalid and dangerous to access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855812576 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; Review Comment: Will it be true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855741525 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; Review Comment: The match flag gets the code hard(several modifications) to read. The break statement could be replaced by `return false;`. If the foreach loop ends, all conditions match, so returns true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: com
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855733105 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; Review Comment: These 2 lines duplicate with the following if-elseif-else code, could be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855727314 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ Review Comment: Could you give a little more explain about why these join types are supported and others are not? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
gfphoenix78 commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1855726047 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell*lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell*lc; + AttrFilter *attr_filter; + SeqScanState*sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, +work_mem, random()); + attr_filter->min= LLONG_MAX; + attr_filter->max= LLONG_MIN; Review Comment: LLONG_MAX, LLONG_MIN are platform-spec value, i.e. the bound value for `unsigned long long`, which may not be exactly the same width as Datum. For safety, static assert could be considered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata closed pull request #724: Push the runtime filter from HashJoin down to SeqScan or AM. URL: https://github.com/apache/cloudberry/pull/724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2493470306 > Looks interesting. And I have some questions to discuss. > > * Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. > * Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853724139 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + *HashJoin + * SeqScan <- target + */ +static PlanState * +FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + while (child) + { + /* target is seqscan */ + if (IsA(child, SeqScanState)) + { + te = (Ta
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
zhangyue-hashdata commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853719676 ## src/backend/executor/nodeSeqscan.c: ## @@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + boolisnull; + ListCell *lc; + bloom_filter *blm_filter; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) Review Comment: I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853703384 ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + *HashJoin + * SeqScan <- target + */ +static PlanState * +FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + while (child) + { + /* target is seqscan */ + if (IsA(child, SeqScanState)) + { + te = (TargetEntry
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on PR #724: URL: https://github.com/apache/cloudberry/pull/724#issuecomment-2492889570 Looks interesting. And I have some questions to discuss. - Beside the seqscan, can the runtime filter apply to other types of scan? such as the index scan. - Looks only when the `hashjoin` node and `seqscan` node run in the same process can use the runtime filter. Which means the tables should have same distributed policy on the join columns or one of the table is replicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
fanfuxiaoran commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853241023 ## src/backend/executor/nodeHashjoin.c: ## @@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static PlanState *FindTargetAttr(HashJoinState *hjstate, Review Comment: Looks FindTargetNode is better. ## src/backend/executor/nodeHashjoin.c: ## @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr*expr; + JoinTypejointype; + HashJoin*hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell*lc; + + /* +* Only applicatable for inner, right and semi join, +*/ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* +* check and initialize the runtime filter for all hash conds in +* hj->hashclauses +*/ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + boolmatch; + List*args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) Review Comment: Could it support other expression, whose one arg is the column attr, and the other is a const? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at:
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853188280 ## src/test/regress/expected/gp_runtime_filter.out: ## @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; +QUERY PLAN +--- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB Review Comment: how to debug and get pushdown scankey here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853185274 ## src/backend/executor/nodeSeqscan.c: ## @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) Review Comment: tpcds 1TB, bloom filter will lose efficacy or create failed due to large rows ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853184388 ## src/backend/executor/nodeSeqscan.c: ## @@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + boolisnull; + ListCell *lc; + bloom_filter *blm_filter; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) Review Comment: ```c++ CREATE TABLE distinct_1(a int); CREATE TABLE distinct_2(a int); INSERT INTO distinct_1 VALUES(1),(2),(NULL); INSERT INTO distinct_2 VALUES(1),(NULL); SELECT * FROM distinct_1, distinct_2 WHERE distinct_1.a IS NOT DISTINCT FROM distinct_2.a; ``` test got wrong result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org
Re: [PR] Push the runtime filter from HashJoin down to SeqScan or AM. [cloudberry]
yjhjstz commented on code in PR #724: URL: https://github.com/apache/cloudberry/pull/724#discussion_r1853183808 ## src/backend/executor/nodeHash.c: ## @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell*lc; + List*scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= SK_BLOOM_FILTER; + sk->sk_attno= attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags= 0; + sk->sk_attno= attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); Review Comment: can we merge filter here on the same attno ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org