sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r776450291



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= 
map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next 
to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: 
${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during 
initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in 
LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in 
the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action 
is required.
+      // 2. Failure occurs while adding rows to the map: the partially 
allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the 
catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       That sounds like a good idea, however, `TaskMemoryManager` does not 
expose API to fetch available execution memory. There does exist 
`MemoryManager.getExecutionMemoryUsageForTask` but unfortunately 
`MemoryManager` is not accessible at the place of building the Map.
   
https://github.com/apache/spark/blob/4c806728eb955c07f7e095a6ff085a50d2eef806/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L216
   
   Furthermore, re-building the map takes a fraction amount of the time of the 
entire query execution.
   I observed the following times while running a test with this feature:
   
   <table>
      <body>
          <tr>
            <td>Stream side</td>
            <td>300M rows</td>
         </tr>
         <tr>
            <td>Build side</td>
            <td>90M rows</td>
         </tr>
         <tr>
            <td>Rebuilding the map</td>
            <td>4 seconds (diff from the logs)</td>
         </tr>
         <tr>
            <td>Total query execution time with optimization enabled</td>
            <td>3.9 minutes</td>
         </tr>
         <tr>
            <td>Total query execution time with optimization disabled</td>
            <td>9.2 minutes</td>
         </tr>
      </tbody>
   </table>
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to