Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-08-31 Thread via GitHub


github-actions[bot] closed pull request #16319: feat: use spawned tasks to 
reduce call stack depth and avoid busy waiting
URL: https://github.com/apache/datafusion/pull/16319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-08-22 Thread via GitHub


github-actions[bot] commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-3216113012

   Thank you for your contribution. Unfortunately, this pull request is stale 
because it has been open 60 days with no activity. Please remove the stale 
label or comment or this will be closed in 7 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2963849416

   I did a second identical run because those result seemed just too good to be 
true to me. This is much closer to what I was expecting: more or less status 
quo. That does mean I'm back to getting wildly differing results which I can't 
really explain. This is on an old repurposed PowerEdge r730 running in a Ubuntu 
24.04 VM on esxi. It's the only VM on the machine so can't be noisy neighbors. 
Maybe the hardware is just starting to get flaky.
   
   
   
   Comparing baseline and branch
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │  2802.58 ms │  2835.58 ms │ no change │
   │ QQuery 1 │  1267.40 ms │  1254.16 ms │ no change │
   │ QQuery 2 │  2425.18 ms │  2431.64 ms │ no change │
   │ QQuery 3 │  1049.51 ms │  1096.84 ms │ no change │
   │ QQuery 4 │  2799.22 ms │  2812.48 ms │ no change │
   │ QQuery 5 │ 33146.06 ms │ 32438.14 ms │ no change │
   │ QQuery 6 │  3581.51 ms │  3608.01 ms │ no change │
   │ QQuery 7 │  4244.94 ms │  4316.87 ms │ no change │
   │ QQuery 8 │  1549.39 ms │  1566.28 ms │ no change │
   └──┴─┴─┴───┘
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┡━╇┩
   │ Total Time (baseline)   │ 52865.81ms │
   │ Total Time (branch) │ 52360.00ms │
   │ Average Time (baseline) │  5873.98ms │
   │ Average Time (branch)   │  5817.78ms │
   │ Queries Faster  │  0 │
   │ Queries Slower  │  0 │
   │ Queries with No Change  │  9 │
   │ Queries with Failure│  0 │
   └─┴┘
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │28.51 ms │28.65 ms │ no change │
   │ QQuery 1 │66.92 ms │67.47 ms │ no change │
   │ QQuery 2 │   157.60 ms │   158.95 ms │ no change │
   │ QQuery 3 │   149.93 ms │   149.32 ms │ no change │
   │ QQuery 4 │  1168.56 ms │  1192.96 ms │ no change │
   │ QQuery 5 │  1513.84 ms │  1522.57 ms │ no change │
   │ QQuery 6 │40.48 ms │41.31 ms │ no change │
   │ QQuery 7 │82.32 ms │77.97 ms │ +1.06x faster │
   │ QQuery 8 │  1821.87 ms │  1835.17 ms │ no change │
   │ QQuery 9 │  2179.03 ms │  2154.90 ms │ no change │
   │ QQuery 10│   535.16 ms │   531.71 ms │ no change │
   │ QQuery 11│   604.14 ms │   595.35 ms │ no change │
   │ QQuery 12│  1697.18 ms │  1636.90 ms │ no change │
   │ QQuery 13│  2807.76 ms │  2802.55 ms │ no change │
   │ QQuery 14│  1569.36 ms │  1613.79 ms │ no change │
   │ QQuery 15│  1627.10 ms │  1627.11 ms │ no change │
   │ QQuery 16│  3269.69 ms │  3315.99 ms │ no change │
   │ QQuery 17│  2856.85 ms │  2846.55 ms │ no change │
   │ QQuery 18│  6091.55 ms │  5958.27 ms │ no change │
   │ QQuery 19│   140.87 ms │   138.93 ms │ no change │
   │ QQuery 20│  1887.14 ms │  1893.19 ms │ no change │
   │ QQuery 21│  2269.57 ms │  2281.36 ms │ no change │
   │ QQuery 22│  3931.31 ms │  3954.27 ms │ no change │
   │ QQuery 23│ 15449.02 ms │ 15582.96 ms │ no change │
   │ QQuery 24│   854.06 ms │   869.83 ms │ no change │
   │ QQuery 25│   737.82 ms │   728.33 ms │ no change │
   │ QQuery 26│   958.68 ms │   991.79 ms │ no change │
   │ QQuery 27│  2704.35 ms │  2695.40 ms │ no change │
   │ QQuery 28│ 22518.40 ms │ 22488.05 ms │ no change │
   │ QQuery 29│  1172.40 ms │  1178.97 ms │ no change │
   │ QQuery 30│  1647.91 ms │  1643.26 ms │ no change │
   │ QQuery 31│  1716.99 ms │  1726.10 ms │ no change │
   │ QQuery 32│  5534.67 ms │  5685.19 ms │ no change │
   │ QQuery 33│  6824.90 ms │  6887.88 ms │ no change │
   │ QQuery 34│  7005.46 ms │  7040.82 ms │ no change │
   │ QQuery 35│  2293.87 ms │  2313.10 ms │ no change │
   │ QQuery 36│   165.05 ms │   163.79 ms │ no change │
   │ QQuery 37│87.79 ms │84.71 ms │ no change │
   │ QQuery 38│   165.88 ms │   171.22 ms │ no change │
   │ QQuery 39│   272.32 ms │   275.14 ms │ no change │
   │ QQuery 40│77.26 ms │78.16 ms │ no change │
   │ QQuery 41│67.84 ms │67.96 ms │ no change │
   │ QQuery 42│61.56

Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2963861479

   🤦‍♂️ that's not very useful now is it. I need a better machine to test on
   
   min/avg/max
   ```
   1887.14 / 2104.13 ±982.87 / 8971.27 ms │  1893.19 / 1949.39 ±31.69 / 
2057.54 ms
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-11 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2961984440

   @alamb @Dandandan I started a benchmark run with 50 iterations and the TPCH 
changes yesterday evening. Check the results this morning... 😌 Would be great 
if someone else could confirm.
   
   Long story short, in my environment at least 5 iterations is just too few. 
Especially in the first 5-10 iterations I see way too much variability in the 
runs for it to be useful. It stabilizes later on.
   
   I wonder if it would be a good idea to modify the benchmark code to always 
do a number of warmup iterations before we actually start measuring.
   
   
   
   ```
   Comparing baseline and branch
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │  2841.73 ms │  2868.70 ms │ no change │
   │ QQuery 1 │  1254.60 ms │  1259.50 ms │ no change │
   │ QQuery 2 │  2444.16 ms │  2417.82 ms │ no change │
   │ QQuery 3 │  1075.16 ms │  1046.80 ms │ no change │
   │ QQuery 4 │  2759.06 ms │  2800.21 ms │ no change │
   │ QQuery 5 │ 32991.04 ms │ 33376.96 ms │ no change │
   │ QQuery 6 │  3591.27 ms │  3577.60 ms │ no change │
   │ QQuery 7 │  4174.48 ms │  4260.27 ms │ no change │
   │ QQuery 8 │  1562.72 ms │  1541.47 ms │ no change │
   └──┴─┴─┴───┘
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┡━╇┩
   │ Total Time (baseline)   │ 52694.22ms │
   │ Total Time (branch) │ 53149.33ms │
   │ Average Time (baseline) │  5854.91ms │
   │ Average Time (branch)   │  5905.48ms │
   │ Queries Faster  │  0 │
   │ Queries Slower  │  0 │
   │ Queries with No Change  │  9 │
   │ Queries with Failure│  0 │
   └─┴┘
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │28.47 ms │28.17 ms │ no change │
   │ QQuery 1 │66.76 ms │68.23 ms │ no change │
   │ QQuery 2 │   158.80 ms │   158.78 ms │ no change │
   │ QQuery 3 │   156.06 ms │   147.87 ms │ +1.06x faster │
   │ QQuery 4 │  1169.02 ms │  1170.60 ms │ no change │
   │ QQuery 5 │  1498.30 ms │  1546.96 ms │ no change │
   │ QQuery 6 │41.48 ms │41.42 ms │ no change │
   │ QQuery 7 │80.51 ms │79.37 ms │ no change │
   │ QQuery 8 │  1831.67 ms │  1817.91 ms │ no change │
   │ QQuery 9 │  2152.33 ms │  2190.01 ms │ no change │
   │ QQuery 10│   531.87 ms │   545.53 ms │ no change │
   │ QQuery 11│   592.01 ms │   603.65 ms │ no change │
   │ QQuery 12│  1651.35 ms │  1679.20 ms │ no change │
   │ QQuery 13│  2783.70 ms │  2847.50 ms │ no change │
   │ QQuery 14│  1589.41 ms │  1597.90 ms │ no change │
   │ QQuery 15│  1620.37 ms │  1629.00 ms │ no change │
   │ QQuery 16│  3332.63 ms │  3354.37 ms │ no change │
   │ QQuery 17│  2864.46 ms │  2850.38 ms │ no change │
   │ QQuery 18│  6226.51 ms │  5993.04 ms │ no change │
   │ QQuery 19│   138.61 ms │   140.21 ms │ no change │
   │ QQuery 20│  1892.60 ms │  1910.23 ms │ no change │
   │ QQuery 21│  2292.32 ms │  2325.51 ms │ no change │
   │ QQuery 22│  3944.51 ms │  4020.94 ms │ no change │
   │ QQuery 23│ 15570.73 ms │ 15691.75 ms │ no change │
   │ QQuery 24│   862.29 ms │   875.83 ms │ no change │
   │ QQuery 25│   727.49 ms │   721.70 ms │ no change │
   │ QQuery 26│   975.89 ms │   986.73 ms │ no change │
   │ QQuery 27│  2697.31 ms │  2769.85 ms │ no change │
   │ QQuery 28│ 22911.95 ms │ 22695.04 ms │ no change │
   │ QQuery 29│  1172.68 ms │  1173.51 ms │ no change │
   │ QQuery 30│  1634.30 ms │  1637.76 ms │ no change │
   │ QQuery 31│  1718.66 ms │  1717.93 ms │ no change │
   │ QQuery 32│  5527.03 ms │  5810.12 ms │  1.05x slower │
   │ QQuery 33│  6763.33 ms │  6654.45 ms │ no change │
   │ QQuery 34│  7016.63 ms │  6971.15 ms │ no change │
   │ QQuery 35│  2308.46 ms │  2308.33 ms │ no change │
   │ QQuery 36│   167.09 ms │   170.41 ms │ no change │
   │ QQuery 37│86.96 ms │87.12 ms │ no change │
   │ QQuery 38│   167.31 ms │   172.42 ms │ no change │
   │ QQuery 39│   268.56 ms │   269.12 ms │ no change │
   │ QQuery 40│75.72 ms │76.78 ms

Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2959379449

   https://github.com/apache/datafusion/pull/16357 might be relevant here. I 
was testing on an old system with spinning disks. Going to retest with more 
iterations and this change to make sure I'm not measuring noise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2959270067

   Googling a bit I'm starting to get the impression that I shouldn't be 
thinking Tokio tasks are as lightweight as coroutines in some other ecosystems.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2959180410

   I think I'll put this one in draft for now. Benchmarks results say "needs 
more work and performance analysis" to me.
   
   
   
   ```
   Comparing baseline and branch
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │  2827.60 ms │  2861.15 ms │ no change │
   │ QQuery 1 │  1275.24 ms │  1271.33 ms │ no change │
   │ QQuery 2 │  2529.29 ms │  2542.19 ms │ no change │
   │ QQuery 3 │  1082.87 ms │  1105.09 ms │ no change │
   │ QQuery 4 │  2774.37 ms │  2834.65 ms │ no change │
   │ QQuery 5 │ 32714.77 ms │ 32948.06 ms │ no change │
   │ QQuery 6 │  3662.78 ms │  3610.34 ms │ no change │
   │ QQuery 7 │  4411.89 ms │  4412.35 ms │ no change │
   │ QQuery 8 │  1622.98 ms │  1641.82 ms │ no change │
   └──┴─┴─┴───┘
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┡━╇┩
   │ Total Time (baseline)   │ 52901.80ms │
   │ Total Time (branch) │ 53226.99ms │
   │ Average Time (baseline) │  5877.98ms │
   │ Average Time (branch)   │  5914.11ms │
   │ Queries Faster  │  0 │
   │ Queries Slower  │  0 │
   │ Queries with No Change  │  9 │
   │ Queries with Failure│  0 │
   └─┴┘
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃baseline ┃  branch ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │29.01 ms │28.68 ms │ no change │
   │ QQuery 1 │74.78 ms │69.50 ms │ +1.08x faster │
   │ QQuery 2 │   159.91 ms │   167.57 ms │ no change │
   │ QQuery 3 │   183.96 ms │   188.96 ms │ no change │
   │ QQuery 4 │  1132.22 ms │  1143.89 ms │ no change │
   │ QQuery 5 │  1439.94 ms │  1548.88 ms │  1.08x slower │
   │ QQuery 6 │46.06 ms │43.16 ms │ +1.07x faster │
   │ QQuery 7 │86.63 ms │86.83 ms │ no change │
   │ QQuery 8 │  1840.94 ms │  1836.42 ms │ no change │
   │ QQuery 9 │  2142.35 ms │  2141.25 ms │ no change │
   │ QQuery 10│   562.25 ms │   551.77 ms │ no change │
   │ QQuery 11│   612.86 ms │   647.94 ms │  1.06x slower │
   │ QQuery 12│  1692.83 ms │  1651.23 ms │ no change │
   │ QQuery 13│  2755.19 ms │  2699.53 ms │ no change │
   │ QQuery 14│  1571.17 ms │  1620.12 ms │ no change │
   │ QQuery 15│  1619.18 ms │  1649.79 ms │ no change │
   │ QQuery 16│  3235.65 ms │  3235.25 ms │ no change │
   │ QQuery 17│  2820.07 ms │  2857.36 ms │ no change │
   │ QQuery 18│  6210.01 ms │  5920.66 ms │ no change │
   │ QQuery 19│   159.65 ms │   158.77 ms │ no change │
   │ QQuery 20│  2013.41 ms │  1929.52 ms │ no change │
   │ QQuery 21│  2378.85 ms │  2307.42 ms │ no change │
   │ QQuery 22│  4003.49 ms │  4006.98 ms │ no change │
   │ QQuery 23│ 15582.81 ms │ 15369.75 ms │ no change │
   │ QQuery 24│   911.63 ms │   875.75 ms │ no change │
   │ QQuery 25│   749.19 ms │   748.95 ms │ no change │
   │ QQuery 26│  1002.11 ms │   990.34 ms │ no change │
   │ QQuery 27│  2852.13 ms │  2868.56 ms │ no change │
   │ QQuery 28│ 22829.31 ms │ 22958.81 ms │ no change │
   │ QQuery 29│  1192.45 ms │  1191.21 ms │ no change │
   │ QQuery 30│  1658.09 ms │  1697.88 ms │ no change │
   │ QQuery 31│  1727.92 ms │  1733.49 ms │ no change │
   │ QQuery 32│  5707.58 ms │  5705.04 ms │ no change │
   │ QQuery 33│  6534.26 ms │  6312.33 ms │ no change │
   │ QQuery 34│  6814.26 ms │  7162.10 ms │  1.05x slower │
   │ QQuery 35│  2301.72 ms │  2423.41 ms │  1.05x slower │
   │ QQuery 36│   168.31 ms │   162.38 ms │ no change │
   │ QQuery 37│88.48 ms │87.41 ms │ no change │
   │ QQuery 38│   178.01 ms │   175.14 ms │ no change │
   │ QQuery 39│   281.96 ms │   271.88 ms │ no change │
   │ QQuery 40│76.88 ms │74.45 ms │ no change │
   │ QQuery 41│68.88 ms │72.43 ms │  1.05x slower │
   │ QQuery 42│63.90 ms │68.55 ms │  1.07x slower │
   └──┴─┴─┴───┘
   ┏━┳━┓
   ┃ Benchmark Summary   ┃ ┃
   ┡━╇━┩
   │ Total Time (baseline)   │ 107560.26ms │
   │ Total Time (branch) │ 107441.36ms

Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2959103958

   > In what situations would these changes lead to better performance? I.e. 
why is query 28 28: ~ 1.10x faster?
   
   The jury is still out on whether it makes sense or not. I can explain my 
theoretical reasoning. Apologies up front if I'm writing too pedantically. Just 
trying to explain things as clearly as I can. Not a database guy so this may 
sounds hopelessly naive.
   
   The first observation is that yielding all the way to the runtime in Tokio 
requires stack unwinding. That's the nature of stackless tasks. The deeper your 
call stack is, the more call frames you need to unwind and the more calls 
you'll need to redo to get back to the yield point. I've been trying to find 
information on whether the Rust compiler does some magic to avoid this, but as 
far as I can tell that's not the case. I did find hints that it optimizes 
nested async function calls, but it will not do so for nested dyn Stream 
poll_next calls. Makes sense; an aot compiler will typically not be able to 
optimize across virtual function calls.
   The consequence is that yielding to the runtime can have a non trivial cost. 
The other PR you're reviewing is an extreme example of that.
   
   Second observation is that DataFusion's volcano model naturally leads to 
fairly deep call stacks. The tree of execution plans results in a tree of 
streams and a parent stream's poll_next will often directly call poll_next on a 
child. If you get one of these deep call stacks, yielding from the deepest 
point potentially means unwinding the whole thing and coming back. This is 
mitigated a bit already when volcano breaking operators like repartition are 
present in the plan. The deepest call stacks are seen when running with 
`target_partitions = 1`.
   
   Third, pipeline breaking operators are intrinsically two-phase. First they 
collect, then they emit. There's a gray area of course, but I'm talking about 
the classic ones like single aggregation. While a pipeline breaking stream is 
in its collect phase, it can be 100% sure that it will not have any new data 
for poll_next caller until that phase completes. There's really not much point 
in telling the caller `Poll::Pending` over and over again because that leads to 
busy waiting. But you do still want to yield to the runtime periodically to not 
squat the Tokio executor threads.
   So there are situations where there are potentially long phases where any 
yield to the caller is redundant (there's no new info), but you still need to 
yield for cooperative scheduling.
   
   Combing all that I think you're looking for deep query plans with nested 
pipeline breakers. In a different PR someone pointed me to this query 
https://github.com/apache/datafusion/blob/bf7859e5d9dbdc260674f5333a5cafa9c6e7bc12/datafusion/sqllogictest/test_files/window.slt#L3020
   The nested sorts in the physical plan are something of a worst case 
scenario. At the deepest sort you have a 12 level deep call stack that gets 
reactivated for every yield. If instead we chop this into 6 chained spawned 
task, you get 6 much shallower call stacks. Of those tasks only one will be 
active, the other ones will be inert until there's actually something to do.
   
   A second factor can be the data source. Filesystem streams tend to be always 
ready, others may not. The more the source returns pending the more you'll see 
the overhead described above show up I think.
   
   All of this assumes of course that going up and back down the call stack has 
a non trivial cost. Perhaps it's not significant enough to be measurable. I'm 
still figuring out how to best profile this stuff, so I'm afraid I don't have 
anything more fact based to give you yet.
   
   Besides performance there's a small aesthetic aspect to this. I find that a 
stream that responds with
   `pending  ready ready ready none`
   is more elegant than
   `pending pending pending pending ... pending ready ready ready none`
   The first one abstracts what's going on underneath better than the letter. 
But I understand that raw performance trumps aesthetics here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2958776448

   (Or is it just benchmark noise?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-10 Thread via GitHub


Dandandan commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2958771895

   In what situations would these changes lead to better performance?
   I.e. why is query 28 28: ~ 1.10x faster?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2956628323

   Looking at the `clickbench_partitioned` outliers. Wrt the code changes in 
this PR they seem pretty similar yet one has basically the opposite result of 
the other. What's interesting is that total time is actually lower for the run.
   
   Query 13 1.13x slower
   ```
   SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM 'hits.parquet' 
WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10;
   SortExec: TopK(fetch=10), expr=[u@1 DESC], preserve_partitioning=[false]
 ProjectionExec: expr=[SearchPhrase@0 as SearchPhrase, count(alias1)@1 as u]
   AggregateExec: mode=Single, gby=[SearchPhrase@0 as SearchPhrase], 
aggr=[count(alias1)]
 AggregateExec: mode=Single, gby=[SearchPhrase@1 as SearchPhrase, 
UserID@0 as alias1], aggr=[]
   CoalesceBatchesExec: target_batch_size=8192
 FilterExec: SearchPhrase@1 !=
   DataSourceExec: file_groups={1 group: 
[[Users/pepijn/RustroverProjects/datafusion/benchmarks/data/hits.parquet:0..14779976446]]},
 projection=[UserID, SearchPhrase], file_type=parquet, 
predicate=SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@2 != 
row_count@3 AND (SearchPhrase_min@0 !=  OR  != SearchPhrase_max@1), 
required_guarantees=[SearchPhrase not in ()]
   ```
   
   Query 28: 1.10x faster
   ```
   SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS 
k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM 
'hits.parquet' WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 10 ORDER 
BY l DESC LIMIT 25;
   SortExec: TopK(fetch=25), expr=[l@1 DESC], preserve_partitioning=[false]
 ProjectionExec: 
expr=[regexp_replace(hits.parquet.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0
 as k, avg(character_length(hits.parquet.Referer))@1 as l, count(Int64(1))@2 as 
c, min(hits.parquet.Referer)@3 as min(hits.parquet.Referer)]
   CoalesceBatchesExec: target_batch_size=8192
 FilterExec: count(Int64(1))@2 > 10
   AggregateExec: mode=Single, gby=[regexp_replace(Referer@0, 
^https?://(?:www\.)?([^/]+)/.*$, \1) as 
regexp_replace(hits.parquet.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))],
 aggr=[avg(character_length(hits.parquet.Referer)), count(Int64(1)), 
min(hits.parquet.Referer)]
 CoalesceBatchesExec: target_batch_size=8192
   FilterExec: Referer@0 !=
 DataSourceExec: file_groups={1 group: 
[[Users/pepijn/RustroverProjects/datafusion/benchmarks/data/hits.parquet:0..14779976446]]},
 projection=[Referer], file_type=parquet, predicate=Referer@14 != , 
pruning_predicate=Referer_null_count@2 != row_count@3 AND (Referer_min@0 !=  OR 
 != Referer_max@1), required_guarantees=[Referer not in ()]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2956607445

   I had a look at `clickbench_extended`. I cannot explain the slowdown. Those 
queries do not even use sorting or joins. The plan for the first one for 
instance is
   
   ```
   AggregateExec: mode=Final




 
 CoalescePartitionsExec 







   AggregateExec: mode=Partial  




 
 DataSourceExec:
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136197682


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I might have gotten this completely wrong, but that should not be the case. 
You're correct that preparing the stream is potentially on a different thread, 
but what gets returned by the task is the stream itself, not the individual 
record batches. The produced stream should still be getting drained by the 
original task. What you're describing is what `RecordBatchReceiverStream` does 
which is quite different and requires channels for the inter-thread 
communication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136240231


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I've added a test case that attempts to demonstrate that processing is 
deferred. If this looks ok to you I can add the same thing for the other 
touched code as well.
   
   I'm not sure how I can demonstrate the absence of multi-threading in a test 
case.
   
   Wrt comprehensibility, I have to admit I still very much in the 
learning-as-I-go phase of using the futures crate. There might be a more 
elegant or straightforward way to express this construct.



##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I've added a test case that attempts to demonstrate that processing is 
deferred. If this looks ok to you I can add the same thing for the other 
touched code as well.
   
   I'm not sure how I can demonstrate the absence of multi-threading in a test 
case.
   
   Wrt comprehensibility, I have to admit I am still very much in the 
learning-as-I-go phase of using the futures crate. There might be a more 
elegant or straightforward way to express this construct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136197682


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I might have gotten this completely wrong, but that should not be the case. 
You're correct that preparing the stream is potentially on a different thread, 
but what gets returned by the task is the stream itself, not the individual 
record batches. The stream should still be getting drained by the original 
task. What you're describing is what `RecordBatchReceiverStream` does which is 
quite different and requires channels for the inter-thread communication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2956496642

   @alamb I've been trying to make sense of what to do with the benchmark 
results. They always seem to give me very mixed results when I run them locally 
(that's part of why I did the min/max/stddev thing, to try to get more 
insight). Some tests are slower but total and average time increase is 
marginal. Should I take a closer look at the 1.13 slower one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-09 Thread via GitHub


pepijnve commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136181096


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I intentionally tried to avoid that and I'm fairly sure it does, but I'll 
try to come up with a test that demonstrates it since it's a very important 
detail.
   
   These kinds of constructs have an Inceptiony quality to them, don't they. If 
I got my head wrapped around Futures and async correctly they're essentially 
interchangeable and inert until first polled.
   
   What should be happening here is that you get `stream::once` creates a 
stream consisting of a single element which is to be produced by a future. The 
future in question is only polled the first time the stream is polled.
   
   That future is an async block which in its first poll spawns the task. In 
other words, the spawn is deferred until first poll. Then we await the spawned 
task, which is just polling the JoinHandle.
   
   Anyway, I don't want you to take my word for it. I'll get to work on a test 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2954372374

   🤖: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and issue_16318
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━┳━━┓
   ┃ Query┃HEAD ┃ issue_16318 ┃   Change ┃
   ┡━━╇━╇━╇━━┩
   │ QQuery 0 │  1764.58 ms │  1887.17 ms │ 1.07x slower │
   │ QQuery 1 │   697.84 ms │   737.88 ms │ 1.06x slower │
   │ QQuery 2 │  1371.85 ms │  1456.38 ms │ 1.06x slower │
   │ QQuery 3 │   679.75 ms │   706.70 ms │no change │
   │ QQuery 4 │  1455.11 ms │  1441.33 ms │no change │
   │ QQuery 5 │ 15616.91 ms │ 15838.54 ms │no change │
   │ QQuery 6 │  2021.53 ms │  2092.32 ms │no change │
   │ QQuery 7 │  2070.45 ms │  2255.10 ms │ 1.09x slower │
   │ QQuery 8 │   850.20 ms │   820.90 ms │no change │
   └──┴─┴─┴──┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (HEAD)  │ 26528.21ms │
   │ Total Time (issue_16318)   │ 27236.34ms │
   │ Average Time (HEAD)│  2947.58ms │
   │ Average Time (issue_16318) │  3026.26ms │
   │ Queries Faster │  0 │
   │ Queries Slower │  4 │
   │ Queries with No Change │  5 │
   │ Queries with Failure   │  0 │
   └┴┘
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━┳━━━┓
   ┃ Query┃HEAD ┃ issue_16318 ┃Change ┃
   ┡━━╇━╇━╇━━━┩
   │ QQuery 0 │15.98 ms │14.96 ms │ +1.07x faster │
   │ QQuery 1 │32.71 ms │32.55 ms │ no change │
   │ QQuery 2 │81.72 ms │80.60 ms │ no change │
   │ QQuery 3 │98.69 ms │94.35 ms │ no change │
   │ QQuery 4 │   579.71 ms │   595.93 ms │ no change │
   │ QQuery 5 │   845.60 ms │   861.52 ms │ no change │
   │ QQuery 6 │23.05 ms │23.03 ms │ no change │
   │ QQuery 7 │37.18 ms │35.64 ms │ no change │
   │ QQuery 8 │   896.54 ms │   888.95 ms │ no change │
   │ QQuery 9 │  1154.45 ms │  1185.07 ms │ no change │
   │ QQuery 10│   265.14 ms │   266.51 ms │ no change │
   │ QQuery 11│   296.19 ms │   303.53 ms │ no change │
   │ QQuery 12│   896.36 ms │   910.80 ms │ no change │
   │ QQuery 13│  1212.33 ms │  1372.46 ms │  1.13x slower │
   │ QQuery 14│   833.77 ms │   849.28 ms │ no change │
   │ QQuery 15│   810.40 ms │   825.57 ms │ no change │
   │ QQuery 16│  1702.67 ms │  1754.50 ms │ no change │
   │ QQuery 17│  1593.37 ms │  1622.95 ms │ no change │
   │ QQuery 18│  3061.89 ms │  3090.11 ms │ no change │
   │ QQuery 19│80.71 ms │84.37 ms │ no change │
   │ QQuery 20│  1131.96 ms │  1171.56 ms │ no change │
   │ QQuery 21│  1299.18 ms │  1355.14 ms │ no change │
   │ QQuery 22│  2190.27 ms │  2289.42 ms │ no change │
   │ QQuery 23│  8012.02 ms │  8214.48 ms │ no change │
   │ QQuery 24│   467.12 ms │   479.98 ms │ no change │
   │ QQuery 25│   384.56 ms │   405.53 ms │  1.05x slower │
   │ QQuery 26│   527.13 ms │   538.47 ms │ no change │
   │ QQuery 27│  1590.25 ms │  1648.26 ms │ no change │
   │ QQuery 28│ 13842.27 ms │ 12564.81 ms │ +1.10x faster │
   │ QQuery 29│   533.53 ms │   522.59 ms │ no change │
   │ QQuery 30│   803.36 ms │   816.08 ms │ no change │
   │ QQuery 31│   863.54 ms │   862.31 ms │ no change │
   │ QQuery 32│  2627.68 ms │  2692.79 ms │ no change │
   │ QQuery 33│  3276.79 ms │  3356.72 ms │ no change │
   │ QQuery 34│  3323.95 ms │  3383.36 ms │ no change │
   │ QQuery 35│  1273.32 ms │  1281.73 ms │ no change │
   │ QQuery 36│   122.83 ms │   122.15 ms │ no change │
   │ QQuery 37│55.54 ms │57.40 ms │ no change │
   │ QQuery 38│   124.07 ms │   123.23 ms │ no change │
   │ QQuery 39│   197.18 ms │   195.97 ms │ no change │
   │ QQuery 40│48.39 ms │49.25 ms │ no change │
   │ QQuery 41│43.34 ms │44.45 ms │ no change │
   │ QQuery 42│38.56 ms │38.53 ms │ no change │
   └──┴─┴─┴───┘
   ┏┳┓
   ┃ Benchmark Summary  ┃┃
   ┡╇┩
   │ Total Time (HEAD)  │ 57295.30ms │
   │ Total Time (issue_16318)   │ 57106.8

Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-08 Thread via GitHub


alamb commented on PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#issuecomment-2954341770

   🤖 `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr  2 16:34:16 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing issue_16318 (2a965dce60da14d772b9db4e4c5f6e388626b030) to 
1daa5ed5cc51546904d45e23cc148601d973942a 
[diff](https://github.com/apache/datafusion/compare/1daa5ed5cc51546904d45e23cc148601d973942a..2a965dce60da14d772b9db4e4c5f6e388626b030)
   Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: use spawned tasks to reduce call stack depth and avoid busy waiting [datafusion]

2025-06-08 Thread via GitHub


alamb commented on code in PR #16319:
URL: https://github.com/apache/datafusion/pull/16319#discussion_r2134861645


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec {
 Ok(Box::pin(RecordBatchStreamAdapter::new(
 self.schema(),
 futures::stream::once(async move {
-while let Some(batch) = input.next().await {
-let batch = batch?;
-topk.insert_batch(batch)?;
-if topk.finished {
-break;
+// Spawn a task the first time the stream is polled 
for the sort phase.
+// This ensures the consumer of the sort does not poll 
unnecessarily
+// while the sort is ongoing

Review Comment:
   I may not fully understand this change, but I believe it will start 
generating input as soon as `execute` is called, rather than waiting for the 
first poll... 
   
   Another potential problem is that it will disconnect the production of 
batches from the consumption of them -- in other words by design I think it 
will potentially produce batches on a different thread than consumes them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]