ankurdave opened a new pull request #29753:
URL: https://github.com/apache/spark/pull/29753


   ### What changes were proposed in this pull request?
   
   When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, 
`numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` 
is false. This correctly prevents the map from growing, but `canGrowArray` 
incorrectly remains true. Therefore the map keeps accepting new keys and 
exceeds its growth threshold. If we attempt to spill the map in this state, the 
UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By 
this point the task has typically consumed all available memory, so the 
allocation of the new pointer array is likely to fail.
   
   This PR fixes the issue by setting `canGrowArray` to false in this case. 
This prevents the map from accepting new elements when it cannot grow to 
accommodate them.
   
   ### Why are the changes needed?
   
   Without this change, hash aggregations will fail when the number of groups 
per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), 
and when the grouping aggregation is the only memory-consuming operator in its 
stage.
   
   For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` 
fails when `tbl` contains 1 billion distinct values and when 
`spark.sql.shuffle.partitions=1`.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Reproducing this issue requires building a very large BytesToBytesMap. 
Because this is infeasible to do in a unit test, this PR was tested manually by 
adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the 
test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.
   
   ```java
   public abstract class AbstractBytesToBytesMapSuite {
     // ...
     @Test
     public void respectGrowthThresholdAtMaxCapacity() {
       TestMemoryManager memoryManager2 =
           new TestMemoryManager(
               new SparkConf()
               .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
               .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 
1024L)
               .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
               .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
       TaskMemoryManager taskMemoryManager2 = new 
TaskMemoryManager(memoryManager2, 0);
       final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
       final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 
1024, pageSizeBytes);
   
       try {
         // Insert keys into the map until it stops accepting new keys.
         for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
           if (i % (1024 * 1024) == 0) System.out.println("Inserting element " 
+ i);
           final long[] value = new long[]{i};
           BytesToBytesMap.Location loc = map.lookup(value, 
Platform.LONG_ARRAY_OFFSET, 8);
           Assert.assertFalse(loc.isDefined());
           boolean success =
               loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, 
Platform.LONG_ARRAY_OFFSET, 8);
           if (!success) break;
         }
   
         // The map should grow to its max capacity.
         long capacity = map.getArray().size() / 2;
         Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);
   
         // The map should stop accepting new keys once it has reached its 
growth
         // threshold, which is half the max capacity.
         Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);
   
         map.free();
       } finally {
         map.free();
       }
     }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to