hanover-fiste opened a new pull request #31965:
URL: https://github.com/apache/spark/pull/31965


   **What changes were proposed in this pull request?**
   The changes being proposed are to increase the accuracy of JDBCRelation's 
stride calculation, as outlined in: 
https://issues.apache.org/jira/browse/SPARK-34843
   
   In summary: 
   
   Currently, in JDBCRelation (line 123), the stride size is calculated as 
follows:
   val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
   
   Due to truncation happening on both divisions, the stride size can fall 
short of what it should be. This can lead to a big difference between the 
provided upper bound and the actual start of the last partition.
   
   I'm proposing a different formula that doesn't truncate to early, and also 
maintains accuracy using fixed-point decimals. This helps tremendously with the 
size of the last partition, which can be even more amplified if there is data 
skew in that direction. In a real-life test, I've seen a 27% increase in 
performance with this more proper stride alignment. The reason for fixed-point 
decimals instead of floating-point decimals is because inaccuracy due to 
limitation of what the float can represent. This may seem small, but could 
shift the midpoint a bit, and depending on how granular the data is, that could 
translate to quite a difference. It's also just inaccurate, and I'm striving to 
make the partitioning as accurate as possible, within reason.
   
   Lastly, since the last partition's predicate is determined by how the 
strides align starting from the lower bound (plus one stride), there can be 
skew introduced creating a larger last partition compared to the first 
partition. Therefore, after calculating a more precise stride size, I've also 
introduced logic to move the first partition's predicate (which is an offset 
from the lower bound) to a position that closely matches the offset of the last 
partition's predicate (in relation to the upper bound). This makes the first 
and last partition more evenly distributed compared to each other, and helps 
with the last task being the largest (reducing its size). 
   
   **Why are the changes needed?**
   The current implementation is inaccurate and can lead to the last 
task/partition running much longer than previous tasks. Therefore, you can end 
up with a single node/core running for an extended period while other 
nodes/cores are sitting idle. 
   
   
   **Does this PR introduce _any_ user-facing change?**
   No. I would suspect some users will just get a good performance increase. As 
stated above, if we were to run our code on Spark that has this change 
implemented, we would have all of the sudden got a 27% increase in performance.
   
   
   **How was this patch tested?**
   I've added two new unit tests. I did need to update one unit test, but when 
you look at the comparison of the before and after, you'll see better alignment 
of the partitioning with the new implementation. Given that the lower 
partition's predicate is exclusive and the upper's is inclusive, the offset of 
the lower was 3 days, and the offset of the upper was 6 days... that's 
potentially twice the amount of data in that upper partition (could be much 
more depending on how the user's data is distributed).
   
   Other unit tests that utilize timestamps and two partitions have maintained 
their midpoint.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to